1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302
use super::*; /// Object-safe part of [`ComponentDefinition`]. /// /// This trait aggregates all the object-safe super-traits of [`ComponentDefinition`] to make /// trait objects possible while rust doesn't have multi-trait trait-objects. pub trait DynamicComponentDefinition: DynamicPortAccess + ActorRaw + ComponentLifecycle + Send { } impl<T> DynamicComponentDefinition for T where T: DynamicPortAccess + ActorRaw + ComponentLifecycle + Send { } /// The core trait every component must implement /// /// Should usually simply be derived using `#[derive(ComponentDefinition)]`. /// /// Only implement this manually if you need special execution logic, /// for example for custom fairness models. /// /// # Note /// /// The derive macro additionally provides implementation of /// [ProvideRef](ProvideRef) or [RequireRef](RequireRef) for each of the /// component's ports. It is generally recommended to do so as well, when not /// using the derive macro, as it enables some rather convenient APIs. pub trait ComponentDefinition: DynamicComponentDefinition where Self: Sized + 'static, { /// Prepare the component for being run /// /// You *must* call [initialise](ComponentContext::initialise) on this /// component's context instance. /// /// You *must* call [set_parent](ProvidedPort::set_parent) (or [RequiredPort::set_parent](RequiredPort::set_parent)) /// for each of the component's ports. fn setup(&mut self, self_component: Arc<Component<Self>>) -> (); /// Execute events on the component's ports /// /// You may run up to `max_events` events from the component's ports. /// /// The `skip` value normally contains the offset where the last invocation stopped. /// However, you can specify the next value when you create the returning [ExecuteResult](ExecuteResult), /// so you can custome the semantics of this value, if desired. fn execute(&mut self, max_events: usize, skip: usize) -> ExecuteResult; /// Return a reference the component's context field fn ctx(&self) -> &ComponentContext<Self>; /// Return a mutable reference the component's context field fn ctx_mut(&mut self) -> &mut ComponentContext<Self>; /// Return the name of the component's type /// /// This is only used for the logging MDC, so you can technically /// return whatever you like. It simply helps with debugging if it's related /// to the actual struct name. fn type_name() -> &'static str; /// Run a Future on this component, allowing it mutable /// access to the component's internal state on every poll. /// /// Please see the documentation for [ComponentDefinitionAccess](ComponentDefinitionAccess) /// for details on how the internal state may (and may not) be used. /// /// # Example /// /// ``` /// # use kompact::prelude::*; /// /// #[derive(ComponentDefinition, Actor)] /// struct AsyncComponent { /// ctx: ComponentContext<Self>, /// flag: bool, /// } /// impl AsyncComponent { /// fn new() -> Self { /// AsyncComponent { /// ctx: ComponentContext::uninitialised(), /// flag: false, /// } /// } /// } /// impl ComponentLifecycle for AsyncComponent { /// fn on_start(&mut self) -> Handled { /// // on nightly you can just write: async move |mut async_self| {...} /// self.spawn_local(move |mut async_self| async move { /// async_self.flag = true; /// Handled::Ok /// }); /// Handled::Ok /// } /// } /// ``` /// /// # See Also /// /// In order to suspend processing of all other messages and events while completing a /// future, use [block_on](Handled::block_on). /// /// In order to run a large future which does not need access to component's internal state /// at all or until the very end, consider using [spawn_off](ComponentDefinition::spawn_off). fn spawn_local<F>(&mut self, f: impl FnOnce(ComponentDefinitionAccess<Self>) -> F) where Self: 'static, F: futures::Future<Output = Handled> + Send + 'static, { let future = future_task::non_blocking(self, f); future.schedule(); let tag = future.tag(); self.ctx_mut().non_blocking_futures.insert(tag, future); } /// Run a Future on this system's executor pool and return a handle to the result /// /// Handles can be awaited like any other future. /// /// # Note /// /// The current API is not as efficient as calling [FuturesExecutor::spawn](executors::FuturesExecutor::spawn) /// directly, due to some trait object indirection in Kompact systems. /// Thus, if performance is important, it is recommended to maintain a (non trait-object) handle /// to the actual `Executor` pool being used and call its `spawn` function instead. /// This API is really just a somewhat roundabout convenience for doing the same. fn spawn_off<R: Send + 'static>( &self, future: impl futures::Future<Output = R> + 'static + Send, ) -> JoinHandle<R> { self.ctx().system().spawn(future) } } /// A trait to customise handling of lifecycle events /// /// This trait replaces the pre-v0.9 `Provide<ControlPort>` requirement. pub trait ComponentLifecycle: ComponentLogging { /// Gets invoked every time a component receives a Start event /// /// The default implementation simply logs something at debug level. fn on_start(&mut self) -> Handled where Self: 'static, { debug!(self.log(), "Starting..."); Handled::Ok } /// Gets invoked every time a component receives a Stop event /// /// The default implementation simply logs something at debug level. fn on_stop(&mut self) -> Handled where Self: 'static, { debug!(self.log(), "Stopping..."); Handled::Ok } /// Gets invoked every time a component receives a Kill event /// /// The default implementation simply logs something at debug level. fn on_kill(&mut self) -> Handled where Self: 'static, { debug!(self.log(), "Killing..."); Handled::Ok } } /// A mechanism for dynamically getting references to provided/required ports from a component. /// /// Should only be used if working with concrete types, or strict generic bounds (i.e. [`Provide`], /// [`ProvideRef`], etc.) is not an option. This is the case, for example, when working with /// `Arc<dyn `[`AbstractComponent`]`>`. pub trait DynamicPortAccess { /// **Internal API**. Dynamically obtain a mutable reference to a [`ProvidedPort`] if `self` /// provides a port of the type indicated by the passed `port_id`. /// /// This is a low-level API that is automatically implemented by /// `#[derive(ComponentDefinition)]`. Prefer the more strongly typed /// [`get_provided_port`](trait.DynamicComponentDefinition.html#method.get_provided_port). fn get_provided_port_as_any(&mut self, port_id: std::any::TypeId) -> Option<&mut dyn Any>; /// **Internal API**. Dynamically obtain a mutable reference to a [`RequiredPort`] if `self` /// requires a port of the type indicated by the passed `port_id`. /// /// This is a low-level API that is automatically implemented by /// `#[derive(ComponentDefinition)]`. Prefer the more strongly typed /// [`get_required_port`](trait.DynamicComponentDefinition.html#method.get_required_port). fn get_required_port_as_any(&mut self, port_id: std::any::TypeId) -> Option<&mut dyn Any>; } impl<'a, M: MessageBounds> dyn DynamicComponentDefinition<Message = M> + 'a { /// Dynamically obtain a mutable reference to a [`ProvidedPort<P>`](ProvidedPort) if `self` /// provides a port of type `P`. pub fn get_provided_port<P: Port>(&mut self) -> Option<&mut ProvidedPort<P>> { self.get_provided_port_as_any(std::any::TypeId::of::<P>()) .and_then(|any| any.downcast_mut()) } /// Dynamically obtain a mutable reference to a [`RequiredPort<P>`](RequiredPort) if `self` /// requires a port of type `P`. pub fn get_required_port<P: Port>(&mut self) -> Option<&mut RequiredPort<P>> { self.get_required_port_as_any(std::any::TypeId::of::<P>()) .and_then(|any| any.downcast_mut()) } } /// Mutex guard guarding a [`DynamicComponentDefinition`] trait object. pub type DynamicComponentDefinitionMutexGuard<'a, M> = OwningRefMut<Box<dyn Erased + 'a>, dyn DynamicComponentDefinition<Message = M>>; /// Error for when the component definition lock has been poisoned #[derive(Debug)] pub struct LockPoisoned; impl fmt::Display for LockPoisoned { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "component definition lock has been poisoned") } } impl std::error::Error for LockPoisoned {} /// An object-safe trait that exposes most of functionality of a [`Component`] that isn't /// dependent on a particular [`ComponentDefinition`]. /// /// Useful if you want to reduce code bloat by removing the generic parameter from `Component<CD>`. /// /// See also: [`ActorRefFactory`] and [`CoreContainer`], which this trait inherits. pub trait AbstractComponent: ActorRefFactory + CoreContainer + Any { /// Returns a mutable reference to the underlying component definition as a /// [`DynamicComponentDefinition`] trait object. /// /// This can only be done if you have a reference to the component instance that isn't hidden /// behind an [Arc](std::sync::Arc). For example, after the system shuts down and your code /// holds on to the last reference to a component you can use [get_mut](std::sync::Arc::get_mut) /// or [try_unwrap](std::sync::Arc::try_unwrap). fn dyn_definition_mut( &mut self, ) -> &mut dyn DynamicComponentDefinition<Message = Self::Message>; /// Locks the component definition mutex and returns a guard that can be dereferenced to /// access a [`DynamicComponentDefinition`] trait object. fn lock_dyn_definition( &self, ) -> Result<DynamicComponentDefinitionMutexGuard<Self::Message>, LockPoisoned>; /// Views self as [`Any`](std::any::Any). Can be used to downcast to a concrete [`Component`]. fn as_any(&self) -> &dyn Any; } impl<C> AbstractComponent for Component<C> where C: ComponentTraits + ComponentLifecycle, { fn dyn_definition_mut( &mut self, ) -> &mut dyn DynamicComponentDefinition<Message = Self::Message> { self.definition_mut() } fn lock_dyn_definition( &self, ) -> Result<DynamicComponentDefinitionMutexGuard<Self::Message>, LockPoisoned> { let lock = self.mutable_core.lock().map_err(|_| LockPoisoned)?; let res = OwningRefMut::new(Box::new(lock)) .map_mut(|l| { (&mut l.deref_mut().definition) as &mut dyn DynamicComponentDefinition<Message = Self::Message> }) .erase_owner(); Ok(res) } fn as_any(&self) -> &dyn Any { self } } impl<M: MessageBounds> dyn AbstractComponent<Message = M> { /// Execute a function on the underlying component definition /// and return the result. The component definition will be accessed as a /// [`DynamicComponentDefinition`] trait object. /// /// This method will attempt to lock the mutex, and then apply `f` to the component definition /// inside the guard. /// /// This method wraps the mutex guard in an additional allocation. Prefer /// [`Component::on_definition`] where possible. pub fn on_dyn_definition<F, R>(&self, f: F) -> R where F: FnOnce(&mut dyn DynamicComponentDefinition<Message = M>) -> R, { let mut lock = self.lock_dyn_definition().unwrap(); f(&mut *lock) } }