Skip to main content

xs/processor/action/
serve.rs

1use scru128::Scru128Id;
2use std::collections::HashMap;
3use tracing::instrument;
4
5use crate::error::Error;
6use crate::nu;
7use crate::nu::{value_to_json, ReturnOptions};
8use crate::processor::{Lifecycle, LifecycleReader};
9use crate::store::{FollowOption, Frame, ReadOptions, Store};
10
11#[derive(Clone)]
12struct Action {
13    id: Scru128Id,
14    engine: nu::Engine,
15    definition: String,
16    return_options: Option<ReturnOptions>,
17}
18
19async fn handle_define(
20    frame: &Frame,
21    name: &str,
22    store: &Store,
23    active: &mut HashMap<String, Action>,
24) {
25    match register_action(frame, store).await {
26        Ok(action) => {
27            active.insert(name.to_string(), action);
28            let _ = store.append(
29                Frame::builder(format!("xs.action.{name}.active"))
30                    .meta(serde_json::json!({
31                        "action_id": frame.id.to_string(),
32                    }))
33                    .build(),
34            );
35        }
36        Err(err) => {
37            // Parse / build failure: lifecycle .invalid (not the per-call .error).
38            let _ = store.append(
39                Frame::builder(format!("xs.action.{name}.invalid"))
40                    .meta(serde_json::json!({
41                        "action_id": frame.id.to_string(),
42                        "error": err.to_string(),
43                    }))
44                    .build(),
45            );
46        }
47    }
48}
49
50async fn register_action(frame: &Frame, store: &Store) -> Result<Action, Error> {
51    // Get definition from CAS
52    let hash = frame.hash.as_ref().ok_or("Missing hash field")?;
53    let definition_bytes = store.cas_read(hash).await?;
54    let definition = String::from_utf8(definition_bytes)?;
55
56    // Build engine from scratch with VFS modules at this point in the stream
57    let mut engine = crate::processor::build_engine(store, &frame.id)?;
58
59    // Actions read with the streaming variants; .append is added per-invocation
60    // in execute_action, since its base_meta depends on the triggering frame.
61    nu::add_read_commands(&mut engine, store, nu::ReadMode::Stream)?;
62
63    // Parse the action configuration
64    let nu_config = nu::parse_config(&mut engine, &definition)?;
65
66    // Deserialize action-specific options
67    #[derive(serde::Deserialize, Default)]
68    struct ActionOptions {
69        return_options: Option<ReturnOptions>,
70    }
71
72    let action_options: ActionOptions = nu_config.deserialize_options().unwrap_or_default();
73
74    Ok(Action {
75        id: frame.id,
76        engine,
77        definition,
78        return_options: action_options.return_options,
79    })
80}
81
82#[instrument(
83    level = "info",
84    skip(action, frame, store),
85    fields(
86        message = %format!(
87            "action={id} frame={frame_id}:{topic}",
88            id = action.id, frame_id = frame.id, topic = frame.topic
89        )
90    )
91)]
92async fn execute_action(action: Action, frame: &Frame, store: &Store) -> Result<(), Error> {
93    let store = store.clone();
94    let frame = frame.clone();
95
96    tokio::task::spawn_blocking(move || {
97        let base_meta = serde_json::json!({
98            "action_id": action.id.to_string(),
99            "frame_id": frame.id.to_string()
100        });
101
102        let mut engine = action.engine;
103
104        nu::add_write_commands(&mut engine, &store, nu::AppendMode::Direct(base_meta))?;
105
106        // Parse the action configuration to get the up-to-date closure with modules loaded
107        let nu_config = nu::parse_config(&mut engine, &action.definition)?;
108
109        // Run action and process pipeline
110        match run_action(&engine, nu_config.run_closure, &frame) {
111            Ok(pipeline_data) => {
112                let resp_suffix = action
113                    .return_options
114                    .as_ref()
115                    .and_then(|opts| opts.suffix.as_deref())
116                    .unwrap_or(".response");
117                let ttl = action
118                    .return_options
119                    .as_ref()
120                    .and_then(|opts| opts.ttl.clone());
121                let use_cas = action
122                    .return_options
123                    .as_ref()
124                    .and_then(|o| o.target.as_deref())
125                    .is_some_and(|t| t == "cas");
126
127                let topic = format!(
128                    "{topic}{suffix}",
129                    topic = frame.topic.strip_suffix(".call").unwrap(),
130                    suffix = resp_suffix
131                );
132
133                let mut base_meta = serde_json::json!({
134                    "action_id": action.id.to_string(),
135                    "frame_id": frame.id.to_string(),
136                });
137
138                if pipeline_data.is_nothing() {
139                    let _ = store.append(
140                        Frame::builder(topic)
141                            .maybe_ttl(ttl)
142                            .meta(base_meta)
143                            .build(),
144                    );
145                } else {
146                    let value = pipeline_data.into_value(nu_protocol::Span::unknown())?;
147                    if use_cas {
148                        let json_value = value_to_json(&value);
149                        let hash =
150                            store.cas_insert_sync(serde_json::to_string(&json_value)?)?;
151                        let _ = store.append(
152                            Frame::builder(topic)
153                                .maybe_ttl(ttl)
154                                .hash(hash)
155                                .meta(base_meta)
156                                .build(),
157                        );
158                    } else {
159                        match &value {
160                            nu_protocol::Value::Record { .. } => {
161                                let json = value_to_json(&value);
162                                if let serde_json::Value::Object(map) = json {
163                                    for (k, v) in map {
164                                        base_meta[k] = v;
165                                    }
166                                }
167                                let _ = store.append(
168                                    Frame::builder(topic)
169                                        .maybe_ttl(ttl)
170                                        .meta(base_meta)
171                                        .build(),
172                                );
173                            }
174                            _ => {
175                                return Err(format!(
176                                    "Action output must be a record when target is not \"cas\"; got {}. \
177                                     Set return_options.target to \"cas\" for non-record output.",
178                                    value.get_type()
179                                ).into());
180                            }
181                        }
182                    }
183                }
184
185                Ok(()) as Result<(), Box<dyn std::error::Error + Send + Sync>>
186            }
187            Err(err) => {
188                // Emit error event instead of propagating
189                let working_set = nu_protocol::engine::StateWorkingSet::new(&engine.state);
190                let _ = store.append(
191                    Frame::builder(format!(
192                        "{topic}.error",
193                        topic = frame.topic.strip_suffix(".call").unwrap()
194                    ))
195                    .meta(serde_json::json!({
196                        "action_id": action.id.to_string(),
197                        "frame_id": frame.id.to_string(),
198                        "error": nu_protocol::format_cli_error(None, &working_set, &*err, None)
199                    }))
200                    .build(),
201                );
202
203                Ok(()) as Result<(), Box<dyn std::error::Error + Send + Sync>>
204            }
205        }
206    })
207    .await??;
208
209    Ok(())
210}
211
212fn run_action(
213    engine: &nu::Engine,
214    closure: nu_protocol::engine::Closure,
215    frame: &Frame,
216) -> Result<nu_protocol::PipelineData, Box<nu_protocol::ShellError>> {
217    let arg_val = crate::nu::frame_to_value(frame, nu_protocol::Span::unknown(), false);
218
219    let mut engine_clone = engine.clone();
220    engine_clone.run_closure_in_job(
221        &closure,
222        vec![arg_val],
223        None,
224        format!("action {topic}", topic = frame.topic),
225    )
226}
227
228/// Translate `xs.action.<name>.<event>` topics into a lifecycle event.
229fn event_from_frame(
230    frame: &crate::store::Frame,
231) -> Option<(String, crate::processor::lifecycle::Event)> {
232    use crate::processor::lifecycle::Event;
233    let rest = frame.topic.strip_prefix("xs.action.")?;
234    let (name, ev_tag) = split_action_event(rest)?;
235    let event = match ev_tag {
236        "create" => Event::Create { id: frame.id },
237        "term" => Event::Term,
238        "active" => Event::Active {
239            source: source_id(frame)?,
240        },
241        "invalid" => Event::Invalid {
242            source: source_id(frame)?,
243        },
244        "fin.term" | "fin.replaced" => Event::Fin,
245        "replaced" => Event::Replaced,
246        _ => return None,
247    };
248    Some((name.to_string(), event))
249}
250
251fn split_action_event(rest: &str) -> Option<(&str, &str)> {
252    for tag in ["fin.term", "fin.replaced"] {
253        if let Some(name) = rest.strip_suffix(&format!(".{tag}")) {
254            return Some((name, tag));
255        }
256    }
257    for tag in ["create", "term", "active", "invalid", "replaced"] {
258        if let Some(name) = rest.strip_suffix(&format!(".{tag}")) {
259            return Some((name, tag));
260        }
261    }
262    None
263}
264
265fn source_id(frame: &crate::store::Frame) -> Option<scru128::Scru128Id> {
266    use std::str::FromStr;
267    let meta = frame.meta.as_ref()?;
268    let s = meta.get("action_id").and_then(|v| v.as_str())?;
269    scru128::Scru128Id::from_str(s).ok()
270}
271
272#[derive(Default)]
273struct TopicState {
274    slots: crate::processor::lifecycle::Slots,
275    /// Stash of every `.define` frame so threshold can look up by id.
276    frames: HashMap<scru128::Scru128Id, crate::store::Frame>,
277}
278
279pub async fn run(store: Store) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
280    let rx = store
281        .read(ReadOptions::builder().follow(FollowOption::On).build())
282        .await;
283    let mut lifecycle = LifecycleReader::new(rx);
284    let mut states: HashMap<String, TopicState> = HashMap::new();
285    let mut active: HashMap<String, Action> = HashMap::new();
286
287    while let Some(event) = lifecycle.recv().await {
288        match event {
289            Lifecycle::Historical(frame) => {
290                if let Some((name, ev)) = event_from_frame(&frame) {
291                    let state = states.entry(name).or_default();
292                    if let crate::processor::lifecycle::Event::Create { id } = &ev {
293                        state.frames.insert(*id, frame.clone());
294                    }
295                    state.slots.apply(ev);
296                }
297            }
298            Lifecycle::Threshold(_) => {
299                use crate::processor::lifecycle::ThresholdPick;
300                let mut picks: Vec<(String, ThresholdPick)> = states
301                    .iter()
302                    .map(|(t, s)| (t.clone(), s.slots.threshold()))
303                    .collect();
304                picks.sort_by_key(|(_, p)| match p {
305                    ThresholdPick::Start { id, .. } => Some(*id),
306                    ThresholdPick::None => None,
307                });
308                for (name, pick) in picks {
309                    if let ThresholdPick::Start { id, .. } = pick {
310                        if let Some(state) = states.get(&name) {
311                            if let Some(frame) = state.frames.get(&id).cloned() {
312                                handle_define(&frame, &name, &store, &mut active).await;
313                            }
314                        }
315                    }
316                }
317            }
318            Lifecycle::Live(frame) => {
319                use crate::processor::lifecycle::Event;
320                let mut handled_as_lifecycle = false;
321                if let Some((name, ev)) = event_from_frame(&frame) {
322                    handled_as_lifecycle = true;
323                    let is_create = matches!(ev, Event::Create { .. });
324                    let is_term = matches!(ev, Event::Term);
325                    let state = states.entry(name.clone()).or_default();
326                    if let Event::Create { id } = &ev {
327                        state.frames.insert(*id, frame.clone());
328                    }
329                    state.slots.apply(ev);
330                    if is_create {
331                        handle_define(&frame, &name, &store, &mut active).await;
332                    } else if is_term {
333                        // User-driven undefine: drop the action and emit ack.
334                        if active.remove(&name).is_some() {
335                            let _ = store.append(
336                                Frame::builder(format!("xs.action.{name}.fin.term"))
337                                    .meta(serde_json::json!({
338                                        "frame_id": frame.id.to_string(),
339                                    }))
340                                    .build(),
341                            );
342                        }
343                    }
344                }
345                // Per-invocation `.call` lives in the user namespace; it's
346                // not a lifecycle event.
347                if !handled_as_lifecycle {
348                    if let Some(name) = frame.topic.strip_suffix(".call") {
349                        let name = name.to_owned();
350                        if let Some(action) = active.get(&name) {
351                            let store = store.clone();
352                            let frame = frame.clone();
353                            let action = action.clone();
354                            tokio::spawn(async move {
355                                if let Err(e) = execute_action(action, &frame, &store).await {
356                                    tracing::error!("Failed to execute action '{}': {:?}", name, e);
357                                    // Per-call runtime errors stay in the
358                                    // user namespace; lifecycle `.invalid` is
359                                    // reserved for init-time failures.
360                                    let _ = store.append(
361                                        Frame::builder(format!("{name}.error"))
362                                            .meta(serde_json::json!({
363                                                "error": e.to_string(),
364                                                "call_id": frame.id.to_string(),
365                                            }))
366                                            .build(),
367                                    );
368                                }
369                            });
370                        }
371                    }
372                }
373            }
374        }
375    }
376
377    Ok(())
378}