xs/processor/service/
serve.rs1use std::collections::HashMap;
2use std::time::Duration;
3
4use scru128::Scru128Id;
5use serde_json::json;
6use tokio::task::JoinHandle;
7
8use crate::processor::lifecycle::{Event, Slots, ThresholdPick};
9use crate::processor::service::service;
10use crate::processor::{Lifecycle, LifecycleReader};
11use crate::store::{FollowOption, Frame, ReadOptions, Store};
12
13async fn try_start(
14 topic: &str,
15 frame: &Frame,
16 active: &mut HashMap<String, JoinHandle<()>>,
17 store: &Store,
18) {
19 if let Err(e) = handle_spawn_event(topic, frame.clone(), active, store.clone()).await {
20 let meta = json!({
21 "source_id": frame.id.to_string(),
22 "reason": e.to_string()
23 });
24
25 if let Err(e) = store.append(
26 Frame::builder(format!("xs.service.{topic}.invalid"))
27 .meta(meta)
28 .build(),
29 ) {
30 tracing::error!("Error appending error frame: {}", e);
31 }
32 }
33}
34
35async fn handle_spawn_event(
36 topic: &str,
37 frame: Frame,
38 active: &mut HashMap<String, JoinHandle<()>>,
39 store: Store,
40) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
41 let key = topic.to_string();
42 if let Some(handle) = active.get(&key) {
43 if handle.is_finished() {
44 active.remove(&key);
45 } else {
46 return Ok(());
50 }
51 }
52
53 let handle = service::spawn(store, frame);
54 active.insert(key, handle);
55 Ok(())
56}
57
58fn event_from_frame(frame: &Frame) -> Option<(String, Event)> {
60 let rest = frame.topic.strip_prefix("xs.service.")?;
61 let (name, ev_tag) = split_service_event(rest)?;
62 let event = match ev_tag {
63 "create" => Event::Create { id: frame.id },
64 "term" => Event::Term,
65 "active" => Event::Active {
66 source: source_id(frame)?,
67 },
68 "invalid" => Event::Invalid {
69 source: source_id(frame)?,
70 },
71 "fin.ok" | "fin.error" | "fin.term" => Event::Fin,
72 "replaced" => Event::Replaced,
73 "stopped" => Event::Stopped,
74 _ => return None,
75 };
76 Some((name.to_string(), event))
77}
78
79fn split_service_event(rest: &str) -> Option<(&str, &str)> {
80 for tag in ["fin.ok", "fin.error", "fin.term"] {
81 if let Some(name) = rest.strip_suffix(&format!(".{tag}")) {
82 return Some((name, tag));
83 }
84 }
85 for tag in ["create", "term", "active", "invalid", "replaced", "stopped"] {
86 if let Some(name) = rest.strip_suffix(&format!(".{tag}")) {
87 return Some((name, tag));
88 }
89 }
90 None
91}
92
93fn source_id(frame: &Frame) -> Option<Scru128Id> {
94 let meta = frame.meta.as_ref()?;
95 let s = meta.get("source_id").and_then(|v| v.as_str())?;
96 Scru128Id::from_str(s).ok()
97}
98
99#[derive(Default)]
100struct TopicState {
101 slots: Slots,
102 frames: HashMap<Scru128Id, Frame>,
104}
105
106use std::str::FromStr;
107
108pub async fn run(store: Store) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
109 let rx = store
110 .read(ReadOptions::builder().follow(FollowOption::On).build())
111 .await;
112 let mut lifecycle = LifecycleReader::new(rx);
113 let mut states: HashMap<String, TopicState> = HashMap::new();
114 let mut active: HashMap<String, JoinHandle<()>> = HashMap::new();
115
116 while let Some(event) = lifecycle.recv().await {
117 match event {
118 Lifecycle::Historical(frame) => {
119 if let Some((topic, ev)) = event_from_frame(&frame) {
120 let state = states.entry(topic).or_default();
121 if let Event::Create { id } = &ev {
122 state.frames.insert(*id, frame.clone());
123 }
124 state.slots.apply(ev);
125 }
126 }
127 Lifecycle::Threshold(_) => {
128 let mut picks: Vec<(String, ThresholdPick)> = states
133 .iter()
134 .map(|(t, s)| (t.clone(), s.slots.threshold()))
135 .collect();
136 picks.sort_by_key(|(_, p)| match p {
137 ThresholdPick::Start { id, .. } => Some(*id),
138 ThresholdPick::None => None,
139 });
140 for (topic, pick) in picks {
141 if let ThresholdPick::Start { id, .. } = pick {
142 if let Some(state) = states.get(&topic) {
143 if let Some(frame) = state.frames.get(&id).cloned() {
144 try_start(&topic, &frame, &mut active, &store).await;
145 }
146 }
147 }
148 }
149 }
150 Lifecycle::Live(frame) => {
151 if frame.topic == "xs.stopping" {
152 break;
153 }
154 if let Some((topic, ev)) = event_from_frame(&frame) {
155 let is_create = matches!(ev, Event::Create { .. });
156 let removes_active = matches!(ev, Event::Fin | Event::Stopped);
157 let state = states.entry(topic.clone()).or_default();
158 if let Event::Create { id } = &ev {
159 state.frames.insert(*id, frame.clone());
160 }
161 state.slots.apply(ev);
162 if is_create {
163 try_start(&topic, &frame, &mut active, &store).await;
164 } else if removes_active {
165 active.remove(&topic);
166 }
167 }
168 }
169 }
170 }
171
172 let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
173 for (_, handle) in active {
174 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
175 let _ = tokio::time::timeout(remaining, handle).await;
176 }
177
178 Ok(())
179}