wire_framework/service/
context.rs

1use std::any::type_name;
2
3use super::shutdown_hook::ShutdownHook;
4use crate::{
5    resource::{Resource, ResourceId, StoredResource},
6    service::{Service, named_future::NamedFuture},
7    task::Task,
8    wiring_layer::WiringError,
9};
10
11/// An interface to the service provided to the tasks during initialization.
12/// This the main point of interaction between with the service.
13///
14/// The context provides access to the runtime, resources, and allows adding new tasks.
15#[derive(Debug)]
16pub struct ServiceContext<'a> {
17    layer: &'a str,
18    service: &'a mut Service,
19}
20
21impl<'a> ServiceContext<'a> {
22    /// Instantiates a new context.
23    /// The context keeps information about the layer that created it for reporting purposes.
24    pub(super) fn new(layer: &'a str, service: &'a mut Service) -> Self {
25        Self { layer, service }
26    }
27
28    /// Provides access to the runtime used by the service.
29    ///
30    /// Can be used to spawn additional tasks within the same runtime.
31    /// If some task stores the handle to spawn additional tasks, it is expected to do all the required
32    /// cleanup.
33    ///
34    /// In most cases, however, it is recommended to use [`add_task`](ServiceContext::add_task) or its alternative
35    /// instead.
36    ///
37    /// ## Note
38    ///
39    /// While `tokio::spawn` and `tokio::spawn_blocking` will work as well, using the runtime handle
40    /// from the context is still a recommended way to get access to runtime, as it tracks the access
41    /// to the runtimes by layers.
42    pub fn runtime_handle(&self) -> &tokio::runtime::Handle {
43        tracing::info!(
44            "Layer {} has requested access to the Tokio runtime",
45            self.layer
46        );
47        self.service.runtime.handle()
48    }
49
50    /// Adds a task to the service.
51    ///
52    /// Added tasks will be launched after the wiring process will be finished and all the preconditions
53    /// are met.
54    pub fn add_task<T: Task>(&mut self, task: T) -> &mut Self {
55        tracing::info!("Layer {} has added a new task: {}", self.layer, task.id());
56        self.service.runnables.tasks.push(Box::new(task));
57        self
58    }
59
60    /// Adds a future to be invoked after node shutdown.
61    /// May be used to perform cleanup tasks.
62    ///
63    /// The future is guaranteed to only be polled after all the node tasks are stopped or timed out.
64    /// All the futures will be awaited sequentially.
65    pub fn add_shutdown_hook(&mut self, hook: ShutdownHook) -> &mut Self {
66        tracing::info!(
67            "Layer {} has added a new shutdown hook: {}",
68            self.layer,
69            hook.id
70        );
71        self.service
72            .runnables
73            .shutdown_hooks
74            .push(NamedFuture::new(hook.future, hook.id));
75        self
76    }
77
78    /// Attempts to retrieve the resource of the specified type.
79    ///
80    /// ## Panics
81    ///
82    /// Panics if the resource with the specified [`ResourceId`] exists, but is not of the requested type.
83    pub fn get_resource<T: Resource + Clone>(&mut self) -> Result<T, WiringError> {
84        // Implementation details:
85        // Internally the resources are stored as [`std::any::Any`], and this method does the downcasting
86        // on behalf of the caller.
87        #[allow(clippy::borrowed_box)]
88        let downcast_clone = |resource: &Box<dyn StoredResource>| {
89            resource
90                .downcast_ref::<T>()
91                .unwrap_or_else(|| {
92                    panic!(
93                        "Resource {} is not of type {}",
94                        T::name(),
95                        std::any::type_name::<T>()
96                    )
97                })
98                .clone()
99        };
100
101        // Check whether the resource is already available.
102        if let Some(resource) = self.service.resources.get(&ResourceId::of::<T>()) {
103            tracing::info!(
104                "Layer {} has requested resource {} of type {}",
105                self.layer,
106                T::name(),
107                type_name::<T>()
108            );
109            return Ok(downcast_clone(resource));
110        }
111
112        tracing::info!(
113            "Layer {} has requested resource {} of type {}, but it is not available",
114            self.layer,
115            T::name(),
116            type_name::<T>()
117        );
118
119        // No such resource.
120        // The requester is allowed to decide whether this is an error or not.
121        Err(WiringError::ResourceLacking {
122            name: T::name(),
123            id: ResourceId::of::<T>(),
124        })
125    }
126
127    /// Attempts to retrieve the resource of the specified type.
128    /// If the resource is not available, it is created using the provided closure.
129    pub fn get_resource_or_insert_with<T: Resource + Clone, F: FnOnce() -> T>(
130        &mut self,
131        f: F,
132    ) -> T {
133        if let Ok(resource) = self.get_resource::<T>() {
134            return resource;
135        }
136
137        // No such resource, insert a new one.
138        let resource = f();
139        self.service
140            .resources
141            .insert(ResourceId::of::<T>(), Box::new(resource.clone()));
142        tracing::info!(
143            "Layer {} has created a new resource {}",
144            self.layer,
145            T::name()
146        );
147        resource
148    }
149
150    /// Attempts to retrieve the resource of the specified type.
151    /// If the resource is not available, it is created using `T::default()`.
152    pub fn get_resource_or_default<T: Resource + Clone + Default>(&mut self) -> T {
153        self.get_resource_or_insert_with(T::default)
154    }
155
156    /// Adds a resource to the service.
157    ///
158    /// If the resource with the same type is already provided, the method will return an error.
159    pub fn insert_resource<T: Resource>(&mut self, resource: T) -> Result<(), WiringError> {
160        let id = ResourceId::of::<T>();
161        if self.service.resources.contains_key(&id) {
162            tracing::info!(
163                "Layer {} has attempted to provide resource {} of type {}, but it is already available",
164                self.layer,
165                T::name(),
166                type_name::<T>()
167            );
168            return Err(WiringError::ResourceAlreadyProvided {
169                id: ResourceId::of::<T>(),
170                name: T::name(),
171            });
172        }
173        self.service.resources.insert(id, Box::new(resource));
174        tracing::info!(
175            "Layer {} has provided a new resource {}",
176            self.layer,
177            T::name()
178        );
179        Ok(())
180    }
181}