xs/processor/service/
serve.rs1use std::collections::HashMap;
2use std::time::Duration;
3
4use serde_json::json;
5use tokio::task::JoinHandle;
6
7use crate::processor::service::service;
8use crate::processor::{Lifecycle, LifecycleReader};
9use crate::store::{FollowOption, Frame, ReadOptions, Store};
10
11async fn try_start(
12 topic: &str,
13 frame: &Frame,
14 active: &mut HashMap<String, JoinHandle<()>>,
15 store: &Store,
16) {
17 if let Err(e) = handle_spawn_event(topic, frame.clone(), active, store.clone()).await {
18 let meta = json!({
19 "source_id": frame.id.to_string(),
20 "reason": e.to_string()
21 });
22
23 if let Err(e) = store.append(
24 Frame::builder(format!("{topic}.parse.error"))
25 .meta(meta)
26 .build(),
27 ) {
28 tracing::error!("Error appending error frame: {}", e);
29 }
30 }
31}
32
33async fn handle_spawn_event(
34 topic: &str,
35 frame: Frame,
36 active: &mut HashMap<String, JoinHandle<()>>,
37 store: Store,
38) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
39 let key = topic.to_string();
40 if let Some(handle) = active.get(&key) {
41 if handle.is_finished() {
42 active.remove(&key);
43 } else {
44 return Ok(());
48 }
49 }
50
51 let handle = service::spawn(store, frame);
52 active.insert(key, handle);
53 Ok(())
54}
55
56pub async fn run(store: Store) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
57 let rx = store
58 .read(ReadOptions::builder().follow(FollowOption::On).build())
59 .await;
60 let mut lifecycle = LifecycleReader::new(rx);
61 let mut compacted: HashMap<String, Frame> = HashMap::new();
62 let mut active: HashMap<String, JoinHandle<()>> = HashMap::new();
63
64 while let Some(event) = lifecycle.recv().await {
65 match event {
66 Lifecycle::Historical(frame) => {
67 if let Some(prefix) = frame
68 .topic
69 .strip_suffix(".parse.error")
70 .or_else(|| frame.topic.strip_suffix(".spawn"))
71 {
72 compacted.insert(prefix.to_string(), frame);
73 } else if let Some(prefix) = frame.topic.strip_suffix(".terminate") {
74 compacted.remove(prefix);
75 }
76 }
77 Lifecycle::Threshold(_) => {
78 for (topic, frame) in compacted.drain() {
79 if frame.topic.ends_with(".spawn") {
80 try_start(&topic, &frame, &mut active, &store).await;
81 }
82 }
83 }
84 Lifecycle::Live(frame) => {
85 if frame.topic == "xs.stopping" {
86 break;
87 }
88 if let Some(prefix) = frame.topic.strip_suffix(".spawn") {
89 try_start(prefix, &frame, &mut active, &store).await;
90 } else if let Some(prefix) = frame.topic.strip_suffix(".shutdown") {
91 active.remove(prefix);
92 }
93 }
94 }
95 }
96
97 let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
98 for (_, handle) in active {
99 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
100 let _ = tokio::time::timeout(remaining, handle).await;
101 }
102
103 Ok(())
104}