nitinol_process/
lifecycle.rs

1use nitinol_core::identifier::ToEntityId;
2use crate::channel::ProcessApplier;
3use crate::{Process, Context};
4use crate::errors::AlreadyExist;
5use crate::receptor::Receptor;
6use crate::registry::ProcessRegistry;
7
8pub async fn run<T: Process>(
9    id: impl ToEntityId,
10    entity: T,
11    start_seq: i64,
12    registry: ProcessRegistry
13) -> Result<Receptor<T>, AlreadyExist> {
14    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<Box<dyn ProcessApplier<T>>>();
15
16    let entity_id = id.to_entity_id();
17    let refs = Receptor { channel: tx };
18    
19    let context = Context::new(start_seq, registry.clone());
20    
21    registry.register(entity_id.clone(), refs.clone()).await?;
22    
23    tokio::spawn(async move {
24        let id = entity_id;
25        let mut entity = entity;
26        let mut context = context;
27        let registry = registry;
28        
29        entity.start(&mut context).await;
30        
31        while let Some(rx) = rx.recv().await {
32            if let Err(e) = rx.apply(&mut entity, &mut context).await {
33                tracing::error!("{e}");
34            }
35            
36            if !context.is_active().await {
37                tracing::warn!("lifecycle ended.");
38                break;
39            }
40        }
41        
42        if let Err(e) = registry.deregister(&id).await {
43            tracing::error!("{e}");
44        }
45        
46        entity.stop(&mut context).await;
47    });
48    
49    Ok(refs)
50}