Skip to main content

xs/processor/actor/
serve.rs

1use std::collections::HashMap;
2use std::str::FromStr;
3
4use scru128::Scru128Id;
5
6use crate::processor::actor::Actor;
7use crate::processor::lifecycle::{Event, Slots, ThresholdPick};
8use crate::processor::{Lifecycle, LifecycleReader};
9use crate::store::{FollowOption, Frame, ReadOptions, Store};
10
11enum StartOutcome {
12    Spawned,
13    Invalid,
14}
15
16async fn try_start_actor(
17    frame: &Frame,
18    store: &Store,
19    name: &str,
20) -> Result<StartOutcome, Box<dyn std::error::Error + Send + Sync>> {
21    match Actor::from_frame(frame, store).await {
22        Ok(actor) => {
23            actor.spawn(store.clone()).await?;
24            Ok(StartOutcome::Spawned)
25        }
26        Err(err) => {
27            let _ = store.append(
28                Frame::builder(format!("xs.actor.{name}.invalid"))
29                    .meta(serde_json::json!({
30                        "actor_id": frame.id.to_string(),
31                        "error": err.to_string(),
32                    }))
33                    .build(),
34            );
35            Ok(StartOutcome::Invalid)
36        }
37    }
38}
39
40/// Translate `xs.actor.<name>.<event>` topics into a lifecycle event.
41///
42/// Returns `(name, event)` if the frame is an actor lifecycle frame.
43/// Maps:
44///   .create     -> Event::Create
45///   .term       -> Event::Term
46///   .active     -> Event::Active   (source from meta.actor_id)
47///   .invalid    -> Event::Invalid  (source from meta.actor_id)
48///   .fin.term   -> Event::Fin
49///   .fin.error  -> Event::Fin
50///   .fin.ok     -> Event::Fin
51///   .replaced   -> Event::Replaced
52///   .stopped    -> Event::Stopped
53fn event_from_frame(frame: &Frame) -> Option<(String, Event)> {
54    let rest = frame.topic.strip_prefix("xs.actor.")?;
55    // The event suffix is everything after the last segment before the
56    // event tokens. event tokens are `.<simple>` or `.fin.<simple>`. The
57    // name is everything before the first such token from the right.
58    let (name, ev_tag) = split_actor_event(rest)?;
59    let event = match ev_tag {
60        "create" => Event::Create { id: frame.id },
61        "term" => Event::Term,
62        "active" => Event::Active {
63            source: source_id(frame)?,
64        },
65        "invalid" => Event::Invalid {
66            source: source_id(frame)?,
67        },
68        "fin.term" | "fin.error" | "fin.ok" => Event::Fin,
69        "replaced" => Event::Replaced,
70        "stopped" => Event::Stopped,
71        _ => return None,
72    };
73    Some((name.to_string(), event))
74}
75
76/// Split `<name>.<event>` where `<event>` is one of the known actor event
77/// tags. `fin.*` is two segments; everything else is one.
78fn split_actor_event(rest: &str) -> Option<(&str, &str)> {
79    for tag in ["fin.term", "fin.error", "fin.ok"] {
80        if let Some(name) = rest.strip_suffix(&format!(".{tag}")) {
81            return Some((name, tag));
82        }
83    }
84    for tag in ["create", "term", "active", "invalid", "replaced", "stopped"] {
85        if let Some(name) = rest.strip_suffix(&format!(".{tag}")) {
86            return Some((name, tag));
87        }
88    }
89    None
90}
91
92fn source_id(frame: &Frame) -> Option<Scru128Id> {
93    let meta = frame.meta.as_ref()?;
94    let s = meta.get("actor_id").and_then(|v| v.as_str())?;
95    Scru128Id::from_str(s).ok()
96}
97
98#[derive(Default)]
99struct TopicState {
100    slots: Slots,
101    /// Stash of every `.register` frame seen so threshold can look up by id.
102    frames: HashMap<Scru128Id, Frame>,
103}
104
105async fn execute_pick(
106    pick: ThresholdPick,
107    state: &TopicState,
108    topic: &str,
109    store: &Store,
110) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
111    let (id, fallback) = match pick {
112        ThresholdPick::None => return Ok(()),
113        ThresholdPick::Start { id, fallback } => (id, fallback),
114    };
115    let Some(frame) = state.frames.get(&id) else {
116        return Ok(()); // shouldn't happen, but be safe
117    };
118    let outcome = try_start_actor(frame, store, topic).await?;
119    if matches!(outcome, StartOutcome::Invalid) {
120        if let Some(fb_id) = fallback {
121            if let Some(fb_frame) = state.frames.get(&fb_id) {
122                let _ = try_start_actor(fb_frame, store, topic).await?;
123            }
124        }
125    }
126    Ok(())
127}
128
129pub async fn run(store: Store) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
130    let rx = store
131        .read(ReadOptions::builder().follow(FollowOption::On).build())
132        .await;
133    let mut lifecycle = LifecycleReader::new(rx);
134    let mut states: HashMap<String, TopicState> = HashMap::new();
135
136    while let Some(event) = lifecycle.recv().await {
137        match event {
138            Lifecycle::Historical(frame) => {
139                if let Some((topic, ev)) = event_from_frame(&frame) {
140                    let state = states.entry(topic).or_default();
141                    if let Event::Create { id } = &ev {
142                        state.frames.insert(*id, frame.clone());
143                    }
144                    state.slots.apply(ev);
145                }
146            }
147            Lifecycle::Threshold(_) => {
148                // Iterate topics in a stable order (by the picked id, when
149                // present) so behaviour matches the previous code that sorted
150                // by frame.id.
151                let mut picks: Vec<(String, ThresholdPick)> = states
152                    .iter()
153                    .map(|(t, s)| (t.clone(), s.slots.threshold()))
154                    .collect();
155                picks.sort_by_key(|(_, p)| match p {
156                    ThresholdPick::Start { id, .. } => Some(*id),
157                    ThresholdPick::None => None,
158                });
159                for (topic, pick) in picks {
160                    if let Some(state) = states.get(&topic) {
161                        execute_pick(pick, state, &topic, &store).await?;
162                    }
163                }
164            }
165            Lifecycle::Live(frame) => {
166                if let Some((topic, ev)) = event_from_frame(&frame) {
167                    let is_create = matches!(ev, Event::Create { .. });
168                    // Capture the confirmed (last known-good) create id BEFORE
169                    // applying this event, so a parse-fail on a hot-replace
170                    // can fall back to it. After Event::Create, pending is
171                    // updated but confirmed is unchanged, so this is also
172                    // safe to read post-apply, doing it first just matches
173                    // the algorithm's intent.
174                    let prior_confirmed = states.get(&topic).and_then(|s| s.slots.confirmed());
175                    let state = states.entry(topic.clone()).or_default();
176                    if let Event::Create { id } = &ev {
177                        state.frames.insert(*id, frame.clone());
178                    }
179                    state.slots.apply(ev);
180                    if is_create {
181                        let outcome = try_start_actor(&frame, &store, &topic).await?;
182                        if matches!(outcome, StartOutcome::Invalid) {
183                            // Live hot-replace fallback (closes deficiency
184                            // #5): the running instance has already exited
185                            // because it saw this .create on its stream. If
186                            // the replacement failed to parse, restart from
187                            // the last confirmed create so the system isn't
188                            // left empty.
189                            if let Some(fb_id) = prior_confirmed {
190                                if let Some(fb_frame) = states
191                                    .get(&topic)
192                                    .and_then(|s| s.frames.get(&fb_id).cloned())
193                                {
194                                    let _ = try_start_actor(&fb_frame, &store, &topic).await?;
195                                }
196                            }
197                        }
198                    }
199                }
200            }
201        }
202    }
203
204    Ok(())
205}