Skip to main content

xs/processor/service/
serve.rs

1use std::collections::HashMap;
2use std::time::Duration;
3
4use scru128::Scru128Id;
5use serde_json::json;
6use tokio::task::JoinHandle;
7
8use crate::processor::lifecycle::{Event, Slots, ThresholdPick};
9use crate::processor::service::service;
10use crate::processor::{Lifecycle, LifecycleReader};
11use crate::store::{FollowOption, Frame, ReadOptions, Store};
12
13async fn try_start(
14    topic: &str,
15    frame: &Frame,
16    active: &mut HashMap<String, JoinHandle<()>>,
17    store: &Store,
18) {
19    if let Err(e) = handle_spawn_event(topic, frame.clone(), active, store.clone()).await {
20        let meta = json!({
21            "source_id": frame.id.to_string(),
22            "reason": e.to_string()
23        });
24
25        if let Err(e) = store.append(
26            Frame::builder(format!("xs.service.{topic}.invalid"))
27                .meta(meta)
28                .build(),
29        ) {
30            tracing::error!("Error appending error frame: {}", e);
31        }
32    }
33}
34
35async fn handle_spawn_event(
36    topic: &str,
37    frame: Frame,
38    active: &mut HashMap<String, JoinHandle<()>>,
39    store: Store,
40) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
41    let key = topic.to_string();
42    if let Some(handle) = active.get(&key) {
43        if handle.is_finished() {
44            active.remove(&key);
45        } else {
46            // A service for this topic is already running. Ignore the
47            // new spawn frame; the running service will handle it as a hot
48            // reload.
49            return Ok(());
50        }
51    }
52
53    let handle = service::spawn(store, frame);
54    active.insert(key, handle);
55    Ok(())
56}
57
58/// Translate `xs.service.<name>.<event>` topics into a lifecycle event.
59fn event_from_frame(frame: &Frame) -> Option<(String, Event)> {
60    let rest = frame.topic.strip_prefix("xs.service.")?;
61    let (name, ev_tag) = split_service_event(rest)?;
62    let event = match ev_tag {
63        "create" => Event::Create { id: frame.id },
64        "term" => Event::Term,
65        "active" => Event::Active {
66            source: source_id(frame)?,
67        },
68        "invalid" => Event::Invalid {
69            source: source_id(frame)?,
70        },
71        "fin.ok" | "fin.error" | "fin.term" => Event::Fin,
72        "replaced" => Event::Replaced,
73        "stopped" => Event::Stopped,
74        _ => return None,
75    };
76    Some((name.to_string(), event))
77}
78
79fn split_service_event(rest: &str) -> Option<(&str, &str)> {
80    for tag in ["fin.ok", "fin.error", "fin.term"] {
81        if let Some(name) = rest.strip_suffix(&format!(".{tag}")) {
82            return Some((name, tag));
83        }
84    }
85    for tag in ["create", "term", "active", "invalid", "replaced", "stopped"] {
86        if let Some(name) = rest.strip_suffix(&format!(".{tag}")) {
87            return Some((name, tag));
88        }
89    }
90    None
91}
92
93fn source_id(frame: &Frame) -> Option<Scru128Id> {
94    let meta = frame.meta.as_ref()?;
95    let s = meta.get("source_id").and_then(|v| v.as_str())?;
96    Scru128Id::from_str(s).ok()
97}
98
99#[derive(Default)]
100struct TopicState {
101    slots: Slots,
102    /// Stash of every `.spawn` frame seen so threshold can look up by id.
103    frames: HashMap<Scru128Id, Frame>,
104}
105
106use std::str::FromStr;
107
108pub async fn run(store: Store) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
109    let rx = store
110        .read(ReadOptions::builder().follow(FollowOption::On).build())
111        .await;
112    let mut lifecycle = LifecycleReader::new(rx);
113    let mut states: HashMap<String, TopicState> = HashMap::new();
114    let mut active: HashMap<String, JoinHandle<()>> = HashMap::new();
115
116    while let Some(event) = lifecycle.recv().await {
117        match event {
118            Lifecycle::Historical(frame) => {
119                if let Some((topic, ev)) = event_from_frame(&frame) {
120                    let state = states.entry(topic).or_default();
121                    if let Event::Create { id } = &ev {
122                        state.frames.insert(*id, frame.clone());
123                    }
124                    state.slots.apply(ev);
125                }
126            }
127            Lifecycle::Threshold(_) => {
128                // Service has no `confirmed` (we don't process `.running`
129                // historically), so threshold picks are always pending-only
130                // with no fallback. Order by the picked id to keep
131                // historical-order behaviour stable.
132                let mut picks: Vec<(String, ThresholdPick)> = states
133                    .iter()
134                    .map(|(t, s)| (t.clone(), s.slots.threshold()))
135                    .collect();
136                picks.sort_by_key(|(_, p)| match p {
137                    ThresholdPick::Start { id, .. } => Some(*id),
138                    ThresholdPick::None => None,
139                });
140                for (topic, pick) in picks {
141                    if let ThresholdPick::Start { id, .. } = pick {
142                        if let Some(state) = states.get(&topic) {
143                            if let Some(frame) = state.frames.get(&id).cloned() {
144                                try_start(&topic, &frame, &mut active, &store).await;
145                            }
146                        }
147                    }
148                }
149            }
150            Lifecycle::Live(frame) => {
151                if frame.topic == "xs.stopping" {
152                    break;
153                }
154                if let Some((topic, ev)) = event_from_frame(&frame) {
155                    let is_create = matches!(ev, Event::Create { .. });
156                    let removes_active = matches!(ev, Event::Fin | Event::Stopped);
157                    let state = states.entry(topic.clone()).or_default();
158                    if let Event::Create { id } = &ev {
159                        state.frames.insert(*id, frame.clone());
160                    }
161                    state.slots.apply(ev);
162                    if is_create {
163                        try_start(&topic, &frame, &mut active, &store).await;
164                    } else if removes_active {
165                        active.remove(&topic);
166                    }
167                }
168            }
169        }
170    }
171
172    let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
173    for (_, handle) in active {
174        let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
175        let _ = tokio::time::timeout(remaining, handle).await;
176    }
177
178    Ok(())
179}