Skip to main content

xs/processor/action/
serve.rs

1use scru128::Scru128Id;
2use std::collections::HashMap;
3use tracing::instrument;
4
5use crate::error::Error;
6use crate::nu;
7use crate::nu::{value_to_json, ReturnOptions};
8use crate::processor::{Lifecycle, LifecycleReader};
9use crate::store::{FollowOption, Frame, ReadOptions, Store};
10
11#[derive(Clone)]
12struct Action {
13    id: Scru128Id,
14    engine: nu::Engine,
15    definition: String,
16    return_options: Option<ReturnOptions>,
17}
18
19async fn handle_define(
20    frame: &Frame,
21    name: &str,
22    store: &Store,
23    active: &mut HashMap<String, Action>,
24) {
25    match register_action(frame, store).await {
26        Ok(action) => {
27            active.insert(name.to_string(), action);
28            let _ = store.append(
29                Frame::builder(format!("xs.action.{name}.active"))
30                    .meta(serde_json::json!({
31                        "action_id": frame.id.to_string(),
32                    }))
33                    .build(),
34            );
35        }
36        Err(err) => {
37            // Parse / build failure: lifecycle .invalid (not the per-call .error).
38            let _ = store.append(
39                Frame::builder(format!("xs.action.{name}.invalid"))
40                    .meta(serde_json::json!({
41                        "action_id": frame.id.to_string(),
42                        "error": err.to_string(),
43                    }))
44                    .build(),
45            );
46        }
47    }
48}
49
50async fn register_action(frame: &Frame, store: &Store) -> Result<Action, Error> {
51    // Get definition from CAS
52    let hash = frame.hash.as_ref().ok_or("Missing hash field")?;
53    let definition_bytes = store.cas_read(hash).await?;
54    let definition = String::from_utf8(definition_bytes)?;
55
56    // Prepared base (Stream reads + Direct `.append`), then the VFS modules as
57    // of this frame. The per-invocation engine is a clone of this, so it only
58    // needs `set_append_meta` for the triggering frame.
59    let mut engine = nu::prepared_base(store, nu::ReadMode::Stream, true)?;
60    let modules = store.nu_modules_at(&frame.id);
61    nu::load_modules(&mut engine.state, store, &modules)?;
62
63    // Parse the action configuration
64    let nu_config = nu::parse_config(&mut engine, &definition)?;
65
66    // Deserialize action-specific options
67    #[derive(serde::Deserialize, Default)]
68    struct ActionOptions {
69        return_options: Option<ReturnOptions>,
70    }
71
72    let action_options: ActionOptions = nu_config.deserialize_options().unwrap_or_default();
73
74    Ok(Action {
75        id: frame.id,
76        engine,
77        definition,
78        return_options: action_options.return_options,
79    })
80}
81
82#[instrument(
83    level = "info",
84    skip(action, frame, store),
85    fields(
86        message = %format!(
87            "action={id} frame={frame_id}:{topic}",
88            id = action.id, frame_id = frame.id, topic = frame.topic
89        )
90    )
91)]
92async fn execute_action(action: Action, frame: &Frame, store: &Store) -> Result<(), Error> {
93    let store = store.clone();
94    let frame = frame.clone();
95
96    tokio::task::spawn_blocking(move || {
97        let mut engine = action.engine;
98
99        // The base already carries `.append`; only the per-frame meta varies.
100        engine.set_append_meta(&serde_json::json!({
101            "action_id": action.id.to_string(),
102            "frame_id": frame.id.to_string()
103        }));
104
105        // Parse the action configuration to get the up-to-date closure with modules loaded
106        let nu_config = nu::parse_config(&mut engine, &action.definition)?;
107
108        // Run action and process pipeline
109        match run_action(&engine, nu_config.run_closure, &frame) {
110            Ok(pipeline_data) => {
111                let resp_suffix = action
112                    .return_options
113                    .as_ref()
114                    .and_then(|opts| opts.suffix.as_deref())
115                    .unwrap_or(".response");
116                let ttl = action
117                    .return_options
118                    .as_ref()
119                    .and_then(|opts| opts.ttl.clone());
120                let use_cas = action
121                    .return_options
122                    .as_ref()
123                    .and_then(|o| o.target.as_deref())
124                    .is_some_and(|t| t == "cas");
125
126                let topic = format!(
127                    "{topic}{suffix}",
128                    topic = frame.topic.strip_suffix(".call").unwrap(),
129                    suffix = resp_suffix
130                );
131
132                let mut base_meta = serde_json::json!({
133                    "action_id": action.id.to_string(),
134                    "frame_id": frame.id.to_string(),
135                });
136
137                if pipeline_data.is_nothing() {
138                    let _ = store.append(
139                        Frame::builder(topic)
140                            .maybe_ttl(ttl)
141                            .meta(base_meta)
142                            .build(),
143                    );
144                } else {
145                    let value = pipeline_data.into_value(nu_protocol::Span::unknown())?;
146                    if use_cas {
147                        let json_value = value_to_json(&value);
148                        let hash =
149                            store.cas_insert_sync(serde_json::to_string(&json_value)?)?;
150                        let _ = store.append(
151                            Frame::builder(topic)
152                                .maybe_ttl(ttl)
153                                .hash(hash)
154                                .meta(base_meta)
155                                .build(),
156                        );
157                    } else {
158                        match &value {
159                            nu_protocol::Value::Record { .. } => {
160                                let json = value_to_json(&value);
161                                if let serde_json::Value::Object(map) = json {
162                                    for (k, v) in map {
163                                        base_meta[k] = v;
164                                    }
165                                }
166                                let _ = store.append(
167                                    Frame::builder(topic)
168                                        .maybe_ttl(ttl)
169                                        .meta(base_meta)
170                                        .build(),
171                                );
172                            }
173                            _ => {
174                                return Err(format!(
175                                    "Action output must be a record when target is not \"cas\"; got {}. \
176                                     Set return_options.target to \"cas\" for non-record output.",
177                                    value.get_type()
178                                ).into());
179                            }
180                        }
181                    }
182                }
183
184                Ok(()) as Result<(), Box<dyn std::error::Error + Send + Sync>>
185            }
186            Err(err) => {
187                // Emit error event instead of propagating
188                let working_set = nu_protocol::engine::StateWorkingSet::new(&engine.state);
189                let _ = store.append(
190                    Frame::builder(format!(
191                        "{topic}.error",
192                        topic = frame.topic.strip_suffix(".call").unwrap()
193                    ))
194                    .meta(serde_json::json!({
195                        "action_id": action.id.to_string(),
196                        "frame_id": frame.id.to_string(),
197                        "error": nu_protocol::format_cli_error(None, &working_set, &*err, None)
198                    }))
199                    .build(),
200                );
201
202                Ok(()) as Result<(), Box<dyn std::error::Error + Send + Sync>>
203            }
204        }
205    })
206    .await??;
207
208    Ok(())
209}
210
211fn run_action(
212    engine: &nu::Engine,
213    closure: nu_protocol::engine::Closure,
214    frame: &Frame,
215) -> Result<nu_protocol::PipelineData, Box<nu_protocol::ShellError>> {
216    let arg_val = crate::nu::frame_to_value(frame, nu_protocol::Span::unknown(), false);
217
218    let mut engine_clone = engine.clone();
219    engine_clone.run_closure_in_job(
220        &closure,
221        vec![arg_val],
222        None,
223        format!("action {topic}", topic = frame.topic),
224    )
225}
226
227/// Translate `xs.action.<name>.<event>` topics into a lifecycle event.
228fn event_from_frame(
229    frame: &crate::store::Frame,
230) -> Option<(String, crate::processor::lifecycle::Event)> {
231    use crate::processor::lifecycle::Event;
232    let rest = frame.topic.strip_prefix("xs.action.")?;
233    let (name, ev_tag) = split_action_event(rest)?;
234    let event = match ev_tag {
235        "create" => Event::Create { id: frame.id },
236        "term" => Event::Term,
237        "active" => Event::Active {
238            source: source_id(frame)?,
239        },
240        "invalid" => Event::Invalid {
241            source: source_id(frame)?,
242        },
243        "fin.term" | "fin.replaced" => Event::Fin,
244        "replaced" => Event::Replaced,
245        _ => return None,
246    };
247    Some((name.to_string(), event))
248}
249
250fn split_action_event(rest: &str) -> Option<(&str, &str)> {
251    for tag in ["fin.term", "fin.replaced"] {
252        if let Some(name) = rest.strip_suffix(&format!(".{tag}")) {
253            return Some((name, tag));
254        }
255    }
256    for tag in ["create", "term", "active", "invalid", "replaced"] {
257        if let Some(name) = rest.strip_suffix(&format!(".{tag}")) {
258            return Some((name, tag));
259        }
260    }
261    None
262}
263
264fn source_id(frame: &crate::store::Frame) -> Option<scru128::Scru128Id> {
265    use std::str::FromStr;
266    let meta = frame.meta.as_ref()?;
267    let s = meta.get("action_id").and_then(|v| v.as_str())?;
268    scru128::Scru128Id::from_str(s).ok()
269}
270
271#[derive(Default)]
272struct TopicState {
273    slots: crate::processor::lifecycle::Slots,
274    /// Stash of every `.define` frame so threshold can look up by id.
275    frames: HashMap<scru128::Scru128Id, crate::store::Frame>,
276}
277
278pub async fn run(store: Store) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
279    let rx = store
280        .read(ReadOptions::builder().follow(FollowOption::On).build())
281        .await;
282    let mut lifecycle = LifecycleReader::new(rx);
283    let mut states: HashMap<String, TopicState> = HashMap::new();
284    let mut active: HashMap<String, Action> = HashMap::new();
285
286    while let Some(event) = lifecycle.recv().await {
287        match event {
288            Lifecycle::Historical(frame) => {
289                if let Some((name, ev)) = event_from_frame(&frame) {
290                    let state = states.entry(name).or_default();
291                    if let crate::processor::lifecycle::Event::Create { id } = &ev {
292                        state.frames.insert(*id, frame.clone());
293                    }
294                    state.slots.apply(ev);
295                }
296            }
297            Lifecycle::Threshold(_) => {
298                use crate::processor::lifecycle::ThresholdPick;
299                let mut picks: Vec<(String, ThresholdPick)> = states
300                    .iter()
301                    .map(|(t, s)| (t.clone(), s.slots.threshold()))
302                    .collect();
303                picks.sort_by_key(|(_, p)| match p {
304                    ThresholdPick::Start { id, .. } => Some(*id),
305                    ThresholdPick::None => None,
306                });
307                for (name, pick) in picks {
308                    if let ThresholdPick::Start { id, .. } = pick {
309                        if let Some(state) = states.get(&name) {
310                            if let Some(frame) = state.frames.get(&id).cloned() {
311                                handle_define(&frame, &name, &store, &mut active).await;
312                            }
313                        }
314                    }
315                }
316            }
317            Lifecycle::Live(frame) => {
318                use crate::processor::lifecycle::Event;
319                let mut handled_as_lifecycle = false;
320                if let Some((name, ev)) = event_from_frame(&frame) {
321                    handled_as_lifecycle = true;
322                    let is_create = matches!(ev, Event::Create { .. });
323                    let is_term = matches!(ev, Event::Term);
324                    let state = states.entry(name.clone()).or_default();
325                    if let Event::Create { id } = &ev {
326                        state.frames.insert(*id, frame.clone());
327                    }
328                    state.slots.apply(ev);
329                    if is_create {
330                        handle_define(&frame, &name, &store, &mut active).await;
331                    } else if is_term {
332                        // User-driven undefine: drop the action and emit ack.
333                        if active.remove(&name).is_some() {
334                            let _ = store.append(
335                                Frame::builder(format!("xs.action.{name}.fin.term"))
336                                    .meta(serde_json::json!({
337                                        "frame_id": frame.id.to_string(),
338                                    }))
339                                    .build(),
340                            );
341                        }
342                    }
343                }
344                // Per-invocation `.call` lives in the user namespace; it's
345                // not a lifecycle event.
346                if !handled_as_lifecycle {
347                    if let Some(name) = frame.topic.strip_suffix(".call") {
348                        let name = name.to_owned();
349                        if let Some(action) = active.get(&name) {
350                            let store = store.clone();
351                            let frame = frame.clone();
352                            let action = action.clone();
353                            tokio::spawn(async move {
354                                if let Err(e) = execute_action(action, &frame, &store).await {
355                                    tracing::error!("Failed to execute action '{}': {:?}", name, e);
356                                    // Per-call runtime errors stay in the
357                                    // user namespace; lifecycle `.invalid` is
358                                    // reserved for init-time failures.
359                                    let _ = store.append(
360                                        Frame::builder(format!("{name}.error"))
361                                            .meta(serde_json::json!({
362                                                "error": e.to_string(),
363                                                "call_id": frame.id.to_string(),
364                                            }))
365                                            .build(),
366                                    );
367                                }
368                            });
369                        }
370                    }
371                }
372            }
373        }
374    }
375
376    Ok(())
377}