Skip to main content

xs/processor/service/
serve.rs

1use std::collections::HashMap;
2use std::time::Duration;
3
4use serde_json::json;
5use tokio::task::JoinHandle;
6
7use crate::processor::service::service;
8use crate::processor::{Lifecycle, LifecycleReader};
9use crate::store::{FollowOption, Frame, ReadOptions, Store};
10
11async fn try_start(
12    topic: &str,
13    frame: &Frame,
14    active: &mut HashMap<String, JoinHandle<()>>,
15    store: &Store,
16) {
17    if let Err(e) = handle_spawn_event(topic, frame.clone(), active, store.clone()).await {
18        let meta = json!({
19            "source_id": frame.id.to_string(),
20            "reason": e.to_string()
21        });
22
23        if let Err(e) = store.append(
24            Frame::builder(format!("{topic}.parse.error"))
25                .meta(meta)
26                .build(),
27        ) {
28            tracing::error!("Error appending error frame: {}", e);
29        }
30    }
31}
32
33async fn handle_spawn_event(
34    topic: &str,
35    frame: Frame,
36    active: &mut HashMap<String, JoinHandle<()>>,
37    store: Store,
38) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
39    let key = topic.to_string();
40    if let Some(handle) = active.get(&key) {
41        if handle.is_finished() {
42            active.remove(&key);
43        } else {
44            // A service for this topic is already running. Ignore the
45            // new spawn frame; the running service will handle it as a hot
46            // reload.
47            return Ok(());
48        }
49    }
50
51    let handle = service::spawn(store, frame);
52    active.insert(key, handle);
53    Ok(())
54}
55
56pub async fn run(store: Store) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
57    let rx = store
58        .read(ReadOptions::builder().follow(FollowOption::On).build())
59        .await;
60    let mut lifecycle = LifecycleReader::new(rx);
61    let mut compacted: HashMap<String, Frame> = HashMap::new();
62    let mut active: HashMap<String, JoinHandle<()>> = HashMap::new();
63
64    while let Some(event) = lifecycle.recv().await {
65        match event {
66            Lifecycle::Historical(frame) => {
67                if let Some(prefix) = frame
68                    .topic
69                    .strip_suffix(".parse.error")
70                    .or_else(|| frame.topic.strip_suffix(".spawn"))
71                {
72                    compacted.insert(prefix.to_string(), frame);
73                } else if let Some(prefix) = frame.topic.strip_suffix(".terminate") {
74                    compacted.remove(prefix);
75                }
76            }
77            Lifecycle::Threshold(_) => {
78                for (topic, frame) in compacted.drain() {
79                    if frame.topic.ends_with(".spawn") {
80                        try_start(&topic, &frame, &mut active, &store).await;
81                    }
82                }
83            }
84            Lifecycle::Live(frame) => {
85                if frame.topic == "xs.stopping" {
86                    break;
87                }
88                if let Some(prefix) = frame.topic.strip_suffix(".spawn") {
89                    try_start(prefix, &frame, &mut active, &store).await;
90                } else if let Some(prefix) = frame.topic.strip_suffix(".shutdown") {
91                    active.remove(prefix);
92                }
93            }
94        }
95    }
96
97    let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
98    for (_, handle) in active {
99        let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
100        let _ = tokio::time::timeout(remaining, handle).await;
101    }
102
103    Ok(())
104}