wire_framework/service/context.rs
1use std::any::type_name;
2
3use super::shutdown_hook::ShutdownHook;
4use crate::{
5 resource::{Resource, ResourceId, StoredResource},
6 service::{Service, named_future::NamedFuture},
7 task::Task,
8 wiring_layer::WiringError,
9};
10
11/// An interface to the service provided to the tasks during initialization.
12/// This the main point of interaction between with the service.
13///
14/// The context provides access to the runtime, resources, and allows adding new tasks.
15#[derive(Debug)]
16pub struct ServiceContext<'a> {
17 layer: &'a str,
18 service: &'a mut Service,
19}
20
21impl<'a> ServiceContext<'a> {
22 /// Instantiates a new context.
23 /// The context keeps information about the layer that created it for reporting purposes.
24 pub(super) fn new(layer: &'a str, service: &'a mut Service) -> Self {
25 Self { layer, service }
26 }
27
28 /// Provides access to the runtime used by the service.
29 ///
30 /// Can be used to spawn additional tasks within the same runtime.
31 /// If some task stores the handle to spawn additional tasks, it is expected to do all the required
32 /// cleanup.
33 ///
34 /// In most cases, however, it is recommended to use [`add_task`](ServiceContext::add_task) or its alternative
35 /// instead.
36 ///
37 /// ## Note
38 ///
39 /// While `tokio::spawn` and `tokio::spawn_blocking` will work as well, using the runtime handle
40 /// from the context is still a recommended way to get access to runtime, as it tracks the access
41 /// to the runtimes by layers.
42 pub fn runtime_handle(&self) -> &tokio::runtime::Handle {
43 tracing::info!(
44 "Layer {} has requested access to the Tokio runtime",
45 self.layer
46 );
47 self.service.runtime.handle()
48 }
49
50 /// Adds a task to the service.
51 ///
52 /// Added tasks will be launched after the wiring process will be finished and all the preconditions
53 /// are met.
54 pub fn add_task<T: Task>(&mut self, task: T) -> &mut Self {
55 tracing::info!("Layer {} has added a new task: {}", self.layer, task.id());
56 self.service.runnables.tasks.push(Box::new(task));
57 self
58 }
59
60 /// Adds a future to be invoked after node shutdown.
61 /// May be used to perform cleanup tasks.
62 ///
63 /// The future is guaranteed to only be polled after all the node tasks are stopped or timed out.
64 /// All the futures will be awaited sequentially.
65 pub fn add_shutdown_hook(&mut self, hook: ShutdownHook) -> &mut Self {
66 tracing::info!(
67 "Layer {} has added a new shutdown hook: {}",
68 self.layer,
69 hook.id
70 );
71 self.service
72 .runnables
73 .shutdown_hooks
74 .push(NamedFuture::new(hook.future, hook.id));
75 self
76 }
77
78 /// Attempts to retrieve the resource of the specified type.
79 ///
80 /// ## Panics
81 ///
82 /// Panics if the resource with the specified [`ResourceId`] exists, but is not of the requested type.
83 pub fn get_resource<T: Resource + Clone>(&mut self) -> Result<T, WiringError> {
84 // Implementation details:
85 // Internally the resources are stored as [`std::any::Any`], and this method does the downcasting
86 // on behalf of the caller.
87 #[allow(clippy::borrowed_box)]
88 let downcast_clone = |resource: &Box<dyn StoredResource>| {
89 resource
90 .downcast_ref::<T>()
91 .unwrap_or_else(|| {
92 panic!(
93 "Resource {} is not of type {}",
94 T::name(),
95 std::any::type_name::<T>()
96 )
97 })
98 .clone()
99 };
100
101 // Check whether the resource is already available.
102 if let Some(resource) = self.service.resources.get(&ResourceId::of::<T>()) {
103 tracing::info!(
104 "Layer {} has requested resource {} of type {}",
105 self.layer,
106 T::name(),
107 type_name::<T>()
108 );
109 return Ok(downcast_clone(resource));
110 }
111
112 tracing::info!(
113 "Layer {} has requested resource {} of type {}, but it is not available",
114 self.layer,
115 T::name(),
116 type_name::<T>()
117 );
118
119 // No such resource.
120 // The requester is allowed to decide whether this is an error or not.
121 Err(WiringError::ResourceLacking {
122 name: T::name(),
123 id: ResourceId::of::<T>(),
124 })
125 }
126
127 /// Attempts to retrieve the resource of the specified type.
128 /// If the resource is not available, it is created using the provided closure.
129 pub fn get_resource_or_insert_with<T: Resource + Clone, F: FnOnce() -> T>(
130 &mut self,
131 f: F,
132 ) -> T {
133 if let Ok(resource) = self.get_resource::<T>() {
134 return resource;
135 }
136
137 // No such resource, insert a new one.
138 let resource = f();
139 self.service
140 .resources
141 .insert(ResourceId::of::<T>(), Box::new(resource.clone()));
142 tracing::info!(
143 "Layer {} has created a new resource {}",
144 self.layer,
145 T::name()
146 );
147 resource
148 }
149
150 /// Attempts to retrieve the resource of the specified type.
151 /// If the resource is not available, it is created using `T::default()`.
152 pub fn get_resource_or_default<T: Resource + Clone + Default>(&mut self) -> T {
153 self.get_resource_or_insert_with(T::default)
154 }
155
156 /// Adds a resource to the service.
157 ///
158 /// If the resource with the same type is already provided, the method will return an error.
159 pub fn insert_resource<T: Resource>(&mut self, resource: T) -> Result<(), WiringError> {
160 let id = ResourceId::of::<T>();
161 if self.service.resources.contains_key(&id) {
162 tracing::info!(
163 "Layer {} has attempted to provide resource {} of type {}, but it is already available",
164 self.layer,
165 T::name(),
166 type_name::<T>()
167 );
168 return Err(WiringError::ResourceAlreadyProvided {
169 id: ResourceId::of::<T>(),
170 name: T::name(),
171 });
172 }
173 self.service.resources.insert(id, Box::new(resource));
174 tracing::info!(
175 "Layer {} has provided a new resource {}",
176 self.layer,
177 T::name()
178 );
179 Ok(())
180 }
181}