xs/processor/actor/
serve.rs1use std::collections::HashMap;
2use std::str::FromStr;
3
4use scru128::Scru128Id;
5
6use crate::processor::actor::Actor;
7use crate::processor::lifecycle::{Event, Slots, ThresholdPick};
8use crate::processor::{Lifecycle, LifecycleReader};
9use crate::store::{FollowOption, Frame, ReadOptions, Store};
10
11enum StartOutcome {
12 Spawned,
13 Invalid,
14}
15
16async fn try_start_actor(
17 frame: &Frame,
18 store: &Store,
19 name: &str,
20) -> Result<StartOutcome, Box<dyn std::error::Error + Send + Sync>> {
21 match Actor::from_frame(frame, store).await {
22 Ok(actor) => {
23 actor.spawn(store.clone()).await?;
24 Ok(StartOutcome::Spawned)
25 }
26 Err(err) => {
27 let _ = store.append(
28 Frame::builder(format!("xs.actor.{name}.invalid"))
29 .meta(serde_json::json!({
30 "actor_id": frame.id.to_string(),
31 "error": err.to_string(),
32 }))
33 .build(),
34 );
35 Ok(StartOutcome::Invalid)
36 }
37 }
38}
39
40fn event_from_frame(frame: &Frame) -> Option<(String, Event)> {
54 let rest = frame.topic.strip_prefix("xs.actor.")?;
55 let (name, ev_tag) = split_actor_event(rest)?;
59 let event = match ev_tag {
60 "create" => Event::Create { id: frame.id },
61 "term" => Event::Term,
62 "active" => Event::Active {
63 source: source_id(frame)?,
64 },
65 "invalid" => Event::Invalid {
66 source: source_id(frame)?,
67 },
68 "fin.term" | "fin.error" | "fin.ok" => Event::Fin,
69 "replaced" => Event::Replaced,
70 "stopped" => Event::Stopped,
71 _ => return None,
72 };
73 Some((name.to_string(), event))
74}
75
76fn split_actor_event(rest: &str) -> Option<(&str, &str)> {
79 for tag in ["fin.term", "fin.error", "fin.ok"] {
80 if let Some(name) = rest.strip_suffix(&format!(".{tag}")) {
81 return Some((name, tag));
82 }
83 }
84 for tag in ["create", "term", "active", "invalid", "replaced", "stopped"] {
85 if let Some(name) = rest.strip_suffix(&format!(".{tag}")) {
86 return Some((name, tag));
87 }
88 }
89 None
90}
91
92fn source_id(frame: &Frame) -> Option<Scru128Id> {
93 let meta = frame.meta.as_ref()?;
94 let s = meta.get("actor_id").and_then(|v| v.as_str())?;
95 Scru128Id::from_str(s).ok()
96}
97
98#[derive(Default)]
99struct TopicState {
100 slots: Slots,
101 frames: HashMap<Scru128Id, Frame>,
103}
104
105async fn execute_pick(
106 pick: ThresholdPick,
107 state: &TopicState,
108 topic: &str,
109 store: &Store,
110) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
111 let (id, fallback) = match pick {
112 ThresholdPick::None => return Ok(()),
113 ThresholdPick::Start { id, fallback } => (id, fallback),
114 };
115 let Some(frame) = state.frames.get(&id) else {
116 return Ok(()); };
118 let outcome = try_start_actor(frame, store, topic).await?;
119 if matches!(outcome, StartOutcome::Invalid) {
120 if let Some(fb_id) = fallback {
121 if let Some(fb_frame) = state.frames.get(&fb_id) {
122 let _ = try_start_actor(fb_frame, store, topic).await?;
123 }
124 }
125 }
126 Ok(())
127}
128
129pub async fn run(store: Store) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
130 let rx = store
131 .read(ReadOptions::builder().follow(FollowOption::On).build())
132 .await;
133 let mut lifecycle = LifecycleReader::new(rx);
134 let mut states: HashMap<String, TopicState> = HashMap::new();
135
136 while let Some(event) = lifecycle.recv().await {
137 match event {
138 Lifecycle::Historical(frame) => {
139 if let Some((topic, ev)) = event_from_frame(&frame) {
140 let state = states.entry(topic).or_default();
141 if let Event::Create { id } = &ev {
142 state.frames.insert(*id, frame.clone());
143 }
144 state.slots.apply(ev);
145 }
146 }
147 Lifecycle::Threshold(_) => {
148 let mut picks: Vec<(String, ThresholdPick)> = states
152 .iter()
153 .map(|(t, s)| (t.clone(), s.slots.threshold()))
154 .collect();
155 picks.sort_by_key(|(_, p)| match p {
156 ThresholdPick::Start { id, .. } => Some(*id),
157 ThresholdPick::None => None,
158 });
159 for (topic, pick) in picks {
160 if let Some(state) = states.get(&topic) {
161 execute_pick(pick, state, &topic, &store).await?;
162 }
163 }
164 }
165 Lifecycle::Live(frame) => {
166 if let Some((topic, ev)) = event_from_frame(&frame) {
167 let is_create = matches!(ev, Event::Create { .. });
168 let prior_confirmed = states.get(&topic).and_then(|s| s.slots.confirmed());
175 let state = states.entry(topic.clone()).or_default();
176 if let Event::Create { id } = &ev {
177 state.frames.insert(*id, frame.clone());
178 }
179 state.slots.apply(ev);
180 if is_create {
181 let outcome = try_start_actor(&frame, &store, &topic).await?;
182 if matches!(outcome, StartOutcome::Invalid) {
183 if let Some(fb_id) = prior_confirmed {
190 if let Some(fb_frame) = states
191 .get(&topic)
192 .and_then(|s| s.frames.get(&fb_id).cloned())
193 {
194 let _ = try_start_actor(&fb_frame, &store, &topic).await?;
195 }
196 }
197 }
198 }
199 }
200 }
201 }
202 }
203
204 Ok(())
205}