xs/processor/actor/
serve.rs1use std::collections::HashMap;
2
3use crate::processor::actor::Actor;
4use crate::processor::{Lifecycle, LifecycleReader};
5use crate::store::{FollowOption, Frame, ReadOptions, Store};
6
7async fn start_actor(
8 frame: &Frame,
9 store: &Store,
10 topic: &str,
11) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
12 match Actor::from_frame(frame, store).await {
13 Ok(actor) => {
14 actor.spawn(store.clone()).await?;
15 Ok(())
16 }
17 Err(err) => {
18 let _ = store.append(
19 Frame::builder(format!("{topic}.unregistered"))
20 .meta(serde_json::json!({
21 "actor_id": frame.id.to_string(),
22 "error": err.to_string(),
23 }))
24 .build(),
25 );
26 Ok(())
27 }
28 }
29}
30
31pub async fn run(store: Store) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
32 let rx = store
33 .read(ReadOptions::builder().follow(FollowOption::On).build())
34 .await;
35 let mut lifecycle = LifecycleReader::new(rx);
36 let mut compacted: HashMap<String, Frame> = HashMap::new();
37
38 while let Some(event) = lifecycle.recv().await {
39 match event {
40 Lifecycle::Historical(frame) => {
41 if let Some((topic, suffix)) = frame.topic.rsplit_once('.') {
42 match suffix {
43 "register" => {
44 compacted.insert(topic.to_string(), frame);
45 }
46 "unregister" | "inactive" => {
47 if let Some(meta) = &frame.meta {
48 if let Some(actor_id) =
49 meta.get("actor_id").and_then(|v| v.as_str())
50 {
51 if let Some(f) = compacted.get(topic) {
52 if f.id.to_string() == actor_id {
53 compacted.remove(topic);
54 }
55 }
56 }
57 }
58 }
59 _ => {}
60 }
61 }
62 }
63 Lifecycle::Threshold(_) => {
64 let mut ordered: Vec<_> = compacted.drain().collect();
65 ordered.sort_by_key(|(_, frame)| frame.id);
66
67 for (topic, frame) in ordered {
68 start_actor(&frame, &store, &topic).await?;
69 }
70 }
71 Lifecycle::Live(frame) => {
72 if let Some(topic) = frame.topic.strip_suffix(".register") {
73 start_actor(&frame, &store, topic).await?;
74 }
75 }
76 }
77 }
78
79 Ok(())
80}