nitinol_process/
lifecycle.rs1use nitinol_core::identifier::ToEntityId;
2use crate::channel::ProcessApplier;
3use crate::{Process, Context};
4use crate::errors::AlreadyExist;
5use crate::receptor::Receptor;
6use crate::registry::ProcessRegistry;
7
8pub async fn run<T: Process>(
9 id: impl ToEntityId,
10 entity: T,
11 start_seq: i64,
12 registry: ProcessRegistry
13) -> Result<Receptor<T>, AlreadyExist> {
14 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<Box<dyn ProcessApplier<T>>>();
15
16 let entity_id = id.to_entity_id();
17 let refs = Receptor { channel: tx };
18
19 let context = Context::new(start_seq, registry.clone());
20
21 registry.register(entity_id.clone(), refs.clone()).await?;
22
23 tokio::spawn(async move {
24 let id = entity_id;
25 let mut entity = entity;
26 let mut context = context;
27 let registry = registry;
28
29 entity.start(&mut context).await;
30
31 while let Some(rx) = rx.recv().await {
32 if let Err(e) = rx.apply(&mut entity, &mut context).await {
33 tracing::error!("{e}");
34 }
35
36 if !context.is_active().await {
37 tracing::warn!("lifecycle ended.");
38 break;
39 }
40 }
41
42 if let Err(e) = registry.deregister(&id).await {
43 tracing::error!("{e}");
44 }
45
46 entity.stop(&mut context).await;
47 });
48
49 Ok(refs)
50}