Skip to main content

xs/processor/actor/
serve.rs

1use std::collections::HashMap;
2
3use crate::processor::actor::Actor;
4use crate::processor::{Lifecycle, LifecycleReader};
5use crate::store::{FollowOption, Frame, ReadOptions, Store};
6
7async fn start_actor(
8    frame: &Frame,
9    store: &Store,
10    topic: &str,
11) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
12    match Actor::from_frame(frame, store).await {
13        Ok(actor) => {
14            actor.spawn(store.clone()).await?;
15            Ok(())
16        }
17        Err(err) => {
18            let _ = store.append(
19                Frame::builder(format!("{topic}.unregistered"))
20                    .meta(serde_json::json!({
21                        "actor_id": frame.id.to_string(),
22                        "error": err.to_string(),
23                    }))
24                    .build(),
25            );
26            Ok(())
27        }
28    }
29}
30
31pub async fn run(store: Store) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
32    let rx = store
33        .read(ReadOptions::builder().follow(FollowOption::On).build())
34        .await;
35    let mut lifecycle = LifecycleReader::new(rx);
36    let mut compacted: HashMap<String, Frame> = HashMap::new();
37
38    while let Some(event) = lifecycle.recv().await {
39        match event {
40            Lifecycle::Historical(frame) => {
41                if let Some((topic, suffix)) = frame.topic.rsplit_once('.') {
42                    match suffix {
43                        "register" => {
44                            compacted.insert(topic.to_string(), frame);
45                        }
46                        "unregister" | "inactive" => {
47                            if let Some(meta) = &frame.meta {
48                                if let Some(actor_id) =
49                                    meta.get("actor_id").and_then(|v| v.as_str())
50                                {
51                                    if let Some(f) = compacted.get(topic) {
52                                        if f.id.to_string() == actor_id {
53                                            compacted.remove(topic);
54                                        }
55                                    }
56                                }
57                            }
58                        }
59                        _ => {}
60                    }
61                }
62            }
63            Lifecycle::Threshold(_) => {
64                let mut ordered: Vec<_> = compacted.drain().collect();
65                ordered.sort_by_key(|(_, frame)| frame.id);
66
67                for (topic, frame) in ordered {
68                    start_actor(&frame, &store, &topic).await?;
69                }
70            }
71            Lifecycle::Live(frame) => {
72                if let Some(topic) = frame.topic.strip_suffix(".register") {
73                    start_actor(&frame, &store, topic).await?;
74                }
75            }
76        }
77    }
78
79    Ok(())
80}