1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
use super::*;

/// Object-safe part of [`ComponentDefinition`].
///
/// This trait aggregates all the object-safe super-traits of [`ComponentDefinition`] to make
/// trait objects possible while rust doesn't have multi-trait trait-objects.
pub trait DynamicComponentDefinition:
    DynamicPortAccess + ActorRaw + ComponentLifecycle + Send
{
}

impl<T> DynamicComponentDefinition for T where
    T: DynamicPortAccess + ActorRaw + ComponentLifecycle + Send
{
}

/// The core trait every component must implement
///
/// Should usually simply be derived using `#[derive(ComponentDefinition)]`.
///
/// Only implement this manually if you need special execution logic,
/// for example for custom fairness models.
///
/// # Note
///
/// The derive macro additionally provides implementation of
/// [ProvideRef](ProvideRef) or [RequireRef](RequireRef) for each of the
/// component's ports. It is generally recommended to do so as well, when not
/// using the derive macro, as it enables some rather convenient APIs.
pub trait ComponentDefinition: DynamicComponentDefinition
where
    Self: Sized + 'static,
{
    /// Prepare the component for being run
    ///
    /// You *must* call [initialise](ComponentContext::initialise) on this
    /// component's context instance.
    ///
    /// You *must* call [set_parent](ProvidedPort::set_parent) (or [RequiredPort::set_parent](RequiredPort::set_parent))
    /// for each of the component's ports.
    fn setup(&mut self, self_component: Arc<Component<Self>>) -> ();

    /// Execute events on the component's ports
    ///
    /// You may run up to `max_events` events from the component's ports.
    ///
    /// The `skip` value normally contains the offset where the last invocation stopped.
    /// However, you can specify the next value when you create the returning [ExecuteResult](ExecuteResult),
    /// so you can custome the semantics of this value, if desired.
    fn execute(&mut self, max_events: usize, skip: usize) -> ExecuteResult;

    /// Return a reference the component's context field
    fn ctx(&self) -> &ComponentContext<Self>;

    /// Return a mutable reference the component's context field
    fn ctx_mut(&mut self) -> &mut ComponentContext<Self>;

    /// Return the name of the component's type
    ///
    /// This is only used for the logging MDC, so you can technically
    /// return whatever you like. It simply helps with debugging if it's related
    /// to the actual struct name.
    fn type_name() -> &'static str;

    /// Run a Future on this component, allowing it mutable
    /// access to the component's internal state on every poll.
    ///
    /// Please see the documentation for [ComponentDefinitionAccess](ComponentDefinitionAccess)
    /// for details on how the internal state may (and may not) be used.
    ///
    /// # Example
    ///
    /// ```
    /// # use kompact::prelude::*;
    ///
    /// #[derive(ComponentDefinition, Actor)]
    /// struct AsyncComponent {
    ///    ctx: ComponentContext<Self>,
    ///    flag: bool,
    /// }
    /// impl AsyncComponent {
    ///     fn new() -> Self {
    ///         AsyncComponent {
    ///             ctx: ComponentContext::uninitialised(),
    ///             flag: false,    
    ///         }
    ///     }   
    /// }
    /// impl ComponentLifecycle for AsyncComponent {
    ///     fn on_start(&mut self) -> Handled {
    ///         // on nightly you can just write: async move |mut async_self| {...}
    ///         self.spawn_local(move |mut async_self| async move {
    ///             async_self.flag = true;
    ///             Handled::Ok
    ///         });
    ///         Handled::Ok
    ///     }   
    /// }
    /// ```
    ///
    /// # See Also
    ///
    /// In order to suspend processing of all other messages and events while completing a
    /// future, use [block_on](Handled::block_on).
    ///
    /// In order to run a large future which does not need access to component's internal state
    /// at all or until the very end, consider using [spawn_off](ComponentDefinition::spawn_off).
    fn spawn_local<F>(&mut self, f: impl FnOnce(ComponentDefinitionAccess<Self>) -> F)
    where
        Self: 'static,
        F: futures::Future<Output = Handled> + Send + 'static,
    {
        let future = future_task::non_blocking(self, f);
        future.schedule();
        let tag = future.tag();
        self.ctx_mut().non_blocking_futures.insert(tag, future);
    }

    /// Run a Future on this system's executor pool and return a handle to the result
    ///
    /// Handles can be awaited like any other future.
    ///
    /// # Note
    ///
    /// The current API is not as efficient as calling [FuturesExecutor::spawn](executors::FuturesExecutor::spawn)
    /// directly, due to some trait object indirection in Kompact systems.
    /// Thus, if performance is important, it is recommended to maintain a (non trait-object) handle
    /// to the actual `Executor` pool being used and call its `spawn` function instead.
    /// This API is really just a somewhat roundabout convenience for doing the same.
    fn spawn_off<R: Send + 'static>(
        &self,
        future: impl futures::Future<Output = R> + 'static + Send,
    ) -> JoinHandle<R> {
        self.ctx().system().spawn(future)
    }
}

/// A trait to customise handling of lifecycle events
///
/// This trait replaces the pre-v0.9 `Provide<ControlPort>` requirement.
pub trait ComponentLifecycle: ComponentLogging {
    /// Gets invoked every time a component receives a Start event
    ///
    /// The default implementation simply logs something at debug level.
    fn on_start(&mut self) -> Handled
    where
        Self: 'static,
    {
        debug!(self.log(), "Starting...");
        Handled::Ok
    }

    /// Gets invoked every time a component receives a Stop event
    ///
    /// The default implementation simply logs something at debug level.
    fn on_stop(&mut self) -> Handled
    where
        Self: 'static,
    {
        debug!(self.log(), "Stopping...");
        Handled::Ok
    }

    /// Gets invoked every time a component receives a Kill event
    ///
    /// The default implementation simply logs something at debug level.
    fn on_kill(&mut self) -> Handled
    where
        Self: 'static,
    {
        debug!(self.log(), "Killing...");
        Handled::Ok
    }
}

/// A mechanism for dynamically getting references to provided/required ports from a component.
///
/// Should only be used if working with concrete types, or strict generic bounds (i.e. [`Provide`],
/// [`ProvideRef`], etc.) is not an option. This is the case, for example, when working with
/// `Arc<dyn `[`AbstractComponent`]`>`.
pub trait DynamicPortAccess {
    /// **Internal API**. Dynamically obtain a mutable reference to a [`ProvidedPort`] if `self`
    /// provides a port of the type indicated by the passed `port_id`.
    ///
    /// This is a low-level API that is automatically implemented by
    /// `#[derive(ComponentDefinition)]`. Prefer the more strongly typed
    /// [`get_provided_port`](trait.DynamicComponentDefinition.html#method.get_provided_port).
    fn get_provided_port_as_any(&mut self, port_id: std::any::TypeId) -> Option<&mut dyn Any>;

    /// **Internal API**. Dynamically obtain a mutable reference to a [`RequiredPort`] if `self`
    /// requires a port of the type indicated by the passed `port_id`.
    ///
    /// This is a low-level API that is automatically implemented by
    /// `#[derive(ComponentDefinition)]`. Prefer the more strongly typed
    /// [`get_required_port`](trait.DynamicComponentDefinition.html#method.get_required_port).
    fn get_required_port_as_any(&mut self, port_id: std::any::TypeId) -> Option<&mut dyn Any>;
}

impl<'a, M: MessageBounds> dyn DynamicComponentDefinition<Message = M> + 'a {
    /// Dynamically obtain a mutable reference to a [`ProvidedPort<P>`](ProvidedPort) if `self`
    /// provides a port of type `P`.
    pub fn get_provided_port<P: Port>(&mut self) -> Option<&mut ProvidedPort<P>> {
        self.get_provided_port_as_any(std::any::TypeId::of::<P>())
            .and_then(|any| any.downcast_mut())
    }

    /// Dynamically obtain a mutable reference to a [`RequiredPort<P>`](RequiredPort) if `self`
    /// requires a port of type `P`.
    pub fn get_required_port<P: Port>(&mut self) -> Option<&mut RequiredPort<P>> {
        self.get_required_port_as_any(std::any::TypeId::of::<P>())
            .and_then(|any| any.downcast_mut())
    }
}

/// Mutex guard guarding a [`DynamicComponentDefinition`] trait object.
pub type DynamicComponentDefinitionMutexGuard<'a, M> =
    OwningRefMut<Box<dyn Erased + 'a>, dyn DynamicComponentDefinition<Message = M>>;

/// Error for when the component definition lock has been poisoned
#[derive(Debug)]
pub struct LockPoisoned;
impl fmt::Display for LockPoisoned {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "component definition lock has been poisoned")
    }
}
impl std::error::Error for LockPoisoned {}

/// An object-safe trait that exposes most of functionality of a [`Component`] that isn't
/// dependent on a particular [`ComponentDefinition`].
///
/// Useful if you want to reduce code bloat by removing the generic parameter from `Component<CD>`.
///
/// See also: [`ActorRefFactory`] and [`CoreContainer`], which this trait inherits.
pub trait AbstractComponent: ActorRefFactory + CoreContainer + Any {
    /// Returns a mutable reference to the underlying component definition as a
    /// [`DynamicComponentDefinition`] trait object.
    ///
    /// This can only be done if you have a reference to the component instance that isn't hidden
    /// behind an [Arc](std::sync::Arc). For example, after the system shuts down and your code
    /// holds on to the last reference to a component you can use [get_mut](std::sync::Arc::get_mut)
    /// or [try_unwrap](std::sync::Arc::try_unwrap).
    fn dyn_definition_mut(
        &mut self,
    ) -> &mut dyn DynamicComponentDefinition<Message = Self::Message>;

    /// Locks the component definition mutex and returns a guard that can be dereferenced to
    /// access a [`DynamicComponentDefinition`] trait object.
    fn lock_dyn_definition(
        &self,
    ) -> Result<DynamicComponentDefinitionMutexGuard<Self::Message>, LockPoisoned>;

    /// Views self as [`Any`](std::any::Any). Can be used to downcast to a concrete [`Component`].
    fn as_any(&self) -> &dyn Any;
}

impl<C> AbstractComponent for Component<C>
where
    C: ComponentTraits + ComponentLifecycle,
{
    fn dyn_definition_mut(
        &mut self,
    ) -> &mut dyn DynamicComponentDefinition<Message = Self::Message> {
        self.definition_mut()
    }

    fn lock_dyn_definition(
        &self,
    ) -> Result<DynamicComponentDefinitionMutexGuard<Self::Message>, LockPoisoned> {
        let lock = self.mutable_core.lock().map_err(|_| LockPoisoned)?;
        let res = OwningRefMut::new(Box::new(lock))
            .map_mut(|l| {
                (&mut l.deref_mut().definition)
                    as &mut dyn DynamicComponentDefinition<Message = Self::Message>
            })
            .erase_owner();
        Ok(res)
    }

    fn as_any(&self) -> &dyn Any {
        self
    }
}

impl<M: MessageBounds> dyn AbstractComponent<Message = M> {
    /// Execute a function on the underlying component definition
    /// and return the result. The component definition will be accessed as a
    /// [`DynamicComponentDefinition`] trait object.
    ///
    /// This method will attempt to lock the mutex, and then apply `f` to the component definition
    /// inside the guard.
    ///
    /// This method wraps the mutex guard in an additional allocation. Prefer
    /// [`Component::on_definition`] where possible.
    pub fn on_dyn_definition<F, R>(&self, f: F) -> R
    where
        F: FnOnce(&mut dyn DynamicComponentDefinition<Message = M>) -> R,
    {
        let mut lock = self.lock_dyn_definition().unwrap();
        f(&mut *lock)
    }
}