Skip to main content

xs/processor/action/
serve.rs

1use scru128::Scru128Id;
2use std::collections::HashMap;
3use tracing::instrument;
4
5use crate::error::Error;
6use crate::nu;
7use crate::nu::commands;
8use crate::nu::{value_to_json, ReturnOptions};
9use crate::processor::{Lifecycle, LifecycleReader};
10use crate::store::{FollowOption, Frame, ReadOptions, Store};
11
12#[derive(Clone)]
13struct Action {
14    id: Scru128Id,
15    engine: nu::Engine,
16    definition: String,
17    return_options: Option<ReturnOptions>,
18}
19
20async fn handle_define(
21    frame: &Frame,
22    name: &str,
23    store: &Store,
24    active: &mut HashMap<String, Action>,
25) {
26    match register_action(frame, store).await {
27        Ok(action) => {
28            active.insert(name.to_string(), action);
29            let _ = store.append(
30                Frame::builder(format!("xs.action.{name}.active"))
31                    .meta(serde_json::json!({
32                        "action_id": frame.id.to_string(),
33                    }))
34                    .build(),
35            );
36        }
37        Err(err) => {
38            // Parse / build failure: lifecycle .invalid (not the per-call .error).
39            let _ = store.append(
40                Frame::builder(format!("xs.action.{name}.invalid"))
41                    .meta(serde_json::json!({
42                        "action_id": frame.id.to_string(),
43                        "error": err.to_string(),
44                    }))
45                    .build(),
46            );
47        }
48    }
49}
50
51async fn register_action(frame: &Frame, store: &Store) -> Result<Action, Error> {
52    // Get definition from CAS
53    let hash = frame.hash.as_ref().ok_or("Missing hash field")?;
54    let definition_bytes = store.cas_read(hash).await?;
55    let definition = String::from_utf8(definition_bytes)?;
56
57    // Build engine from scratch with VFS modules at this point in the stream
58    let mut engine = crate::processor::build_engine(store, &frame.id)?;
59
60    // Add streaming .cat and .last (actions get the streaming versions)
61    engine.add_commands(vec![
62        Box::new(commands::cat_stream_command::CatStreamCommand::new(
63            store.clone(),
64        )),
65        Box::new(commands::last_stream_command::LastStreamCommand::new(
66            store.clone(),
67        )),
68    ])?;
69
70    // Parse the action configuration
71    let nu_config = nu::parse_config(&mut engine, &definition)?;
72
73    // Deserialize action-specific options
74    #[derive(serde::Deserialize, Default)]
75    struct ActionOptions {
76        return_options: Option<ReturnOptions>,
77    }
78
79    let action_options: ActionOptions = nu_config.deserialize_options().unwrap_or_default();
80
81    Ok(Action {
82        id: frame.id,
83        engine,
84        definition,
85        return_options: action_options.return_options,
86    })
87}
88
89#[instrument(
90    level = "info",
91    skip(action, frame, store),
92    fields(
93        message = %format!(
94            "action={id} frame={frame_id}:{topic}",
95            id = action.id, frame_id = frame.id, topic = frame.topic
96        )
97    )
98)]
99async fn execute_action(action: Action, frame: &Frame, store: &Store) -> Result<(), Error> {
100    let store = store.clone();
101    let frame = frame.clone();
102
103    tokio::task::spawn_blocking(move || {
104        let base_meta = serde_json::json!({
105            "action_id": action.id.to_string(),
106            "frame_id": frame.id.to_string()
107        });
108
109        let mut engine = action.engine;
110
111        engine.add_commands(vec![Box::new(
112            commands::append_command::AppendCommand::new(store.clone(), base_meta),
113        )])?;
114
115        // Parse the action configuration to get the up-to-date closure with modules loaded
116        let nu_config = nu::parse_config(&mut engine, &action.definition)?;
117
118        // Run action and process pipeline
119        match run_action(&engine, nu_config.run_closure, &frame) {
120            Ok(pipeline_data) => {
121                let resp_suffix = action
122                    .return_options
123                    .as_ref()
124                    .and_then(|opts| opts.suffix.as_deref())
125                    .unwrap_or(".response");
126                let ttl = action
127                    .return_options
128                    .as_ref()
129                    .and_then(|opts| opts.ttl.clone());
130                let use_cas = action
131                    .return_options
132                    .as_ref()
133                    .and_then(|o| o.target.as_deref())
134                    .is_some_and(|t| t == "cas");
135
136                let topic = format!(
137                    "{topic}{suffix}",
138                    topic = frame.topic.strip_suffix(".call").unwrap(),
139                    suffix = resp_suffix
140                );
141
142                let mut base_meta = serde_json::json!({
143                    "action_id": action.id.to_string(),
144                    "frame_id": frame.id.to_string(),
145                });
146
147                if pipeline_data.is_nothing() {
148                    let _ = store.append(
149                        Frame::builder(topic)
150                            .maybe_ttl(ttl)
151                            .meta(base_meta)
152                            .build(),
153                    );
154                } else {
155                    let value = pipeline_data.into_value(nu_protocol::Span::unknown())?;
156                    if use_cas {
157                        let json_value = value_to_json(&value);
158                        let hash =
159                            store.cas_insert_sync(serde_json::to_string(&json_value)?)?;
160                        let _ = store.append(
161                            Frame::builder(topic)
162                                .maybe_ttl(ttl)
163                                .hash(hash)
164                                .meta(base_meta)
165                                .build(),
166                        );
167                    } else {
168                        match &value {
169                            nu_protocol::Value::Record { .. } => {
170                                let json = value_to_json(&value);
171                                if let serde_json::Value::Object(map) = json {
172                                    for (k, v) in map {
173                                        base_meta[k] = v;
174                                    }
175                                }
176                                let _ = store.append(
177                                    Frame::builder(topic)
178                                        .maybe_ttl(ttl)
179                                        .meta(base_meta)
180                                        .build(),
181                                );
182                            }
183                            _ => {
184                                return Err(format!(
185                                    "Action output must be a record when target is not \"cas\"; got {}. \
186                                     Set return_options.target to \"cas\" for non-record output.",
187                                    value.get_type()
188                                ).into());
189                            }
190                        }
191                    }
192                }
193
194                Ok(()) as Result<(), Box<dyn std::error::Error + Send + Sync>>
195            }
196            Err(err) => {
197                // Emit error event instead of propagating
198                let working_set = nu_protocol::engine::StateWorkingSet::new(&engine.state);
199                let _ = store.append(
200                    Frame::builder(format!(
201                        "{topic}.error",
202                        topic = frame.topic.strip_suffix(".call").unwrap()
203                    ))
204                    .meta(serde_json::json!({
205                        "action_id": action.id.to_string(),
206                        "frame_id": frame.id.to_string(),
207                        "error": nu_protocol::format_cli_error(None, &working_set, &*err, None)
208                    }))
209                    .build(),
210                );
211
212                Ok(()) as Result<(), Box<dyn std::error::Error + Send + Sync>>
213            }
214        }
215    })
216    .await??;
217
218    Ok(())
219}
220
221fn run_action(
222    engine: &nu::Engine,
223    closure: nu_protocol::engine::Closure,
224    frame: &Frame,
225) -> Result<nu_protocol::PipelineData, Box<nu_protocol::ShellError>> {
226    let arg_val = crate::nu::frame_to_value(frame, nu_protocol::Span::unknown(), false);
227
228    let mut engine_clone = engine.clone();
229    engine_clone.run_closure_in_job(
230        &closure,
231        vec![arg_val],
232        None,
233        format!("action {topic}", topic = frame.topic),
234    )
235}
236
237/// Translate `xs.action.<name>.<event>` topics into a lifecycle event.
238fn event_from_frame(
239    frame: &crate::store::Frame,
240) -> Option<(String, crate::processor::lifecycle::Event)> {
241    use crate::processor::lifecycle::Event;
242    let rest = frame.topic.strip_prefix("xs.action.")?;
243    let (name, ev_tag) = split_action_event(rest)?;
244    let event = match ev_tag {
245        "create" => Event::Create { id: frame.id },
246        "term" => Event::Term,
247        "active" => Event::Active {
248            source: source_id(frame)?,
249        },
250        "invalid" => Event::Invalid {
251            source: source_id(frame)?,
252        },
253        "fin.term" | "fin.replaced" => Event::Fin,
254        "replaced" => Event::Replaced,
255        _ => return None,
256    };
257    Some((name.to_string(), event))
258}
259
260fn split_action_event(rest: &str) -> Option<(&str, &str)> {
261    for tag in ["fin.term", "fin.replaced"] {
262        if let Some(name) = rest.strip_suffix(&format!(".{tag}")) {
263            return Some((name, tag));
264        }
265    }
266    for tag in ["create", "term", "active", "invalid", "replaced"] {
267        if let Some(name) = rest.strip_suffix(&format!(".{tag}")) {
268            return Some((name, tag));
269        }
270    }
271    None
272}
273
274fn source_id(frame: &crate::store::Frame) -> Option<scru128::Scru128Id> {
275    use std::str::FromStr;
276    let meta = frame.meta.as_ref()?;
277    let s = meta.get("action_id").and_then(|v| v.as_str())?;
278    scru128::Scru128Id::from_str(s).ok()
279}
280
281#[derive(Default)]
282struct TopicState {
283    slots: crate::processor::lifecycle::Slots,
284    /// Stash of every `.define` frame so threshold can look up by id.
285    frames: HashMap<scru128::Scru128Id, crate::store::Frame>,
286}
287
288pub async fn run(store: Store) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
289    let rx = store
290        .read(ReadOptions::builder().follow(FollowOption::On).build())
291        .await;
292    let mut lifecycle = LifecycleReader::new(rx);
293    let mut states: HashMap<String, TopicState> = HashMap::new();
294    let mut active: HashMap<String, Action> = HashMap::new();
295
296    while let Some(event) = lifecycle.recv().await {
297        match event {
298            Lifecycle::Historical(frame) => {
299                if let Some((name, ev)) = event_from_frame(&frame) {
300                    let state = states.entry(name).or_default();
301                    if let crate::processor::lifecycle::Event::Create { id } = &ev {
302                        state.frames.insert(*id, frame.clone());
303                    }
304                    state.slots.apply(ev);
305                }
306            }
307            Lifecycle::Threshold(_) => {
308                use crate::processor::lifecycle::ThresholdPick;
309                let mut picks: Vec<(String, ThresholdPick)> = states
310                    .iter()
311                    .map(|(t, s)| (t.clone(), s.slots.threshold()))
312                    .collect();
313                picks.sort_by_key(|(_, p)| match p {
314                    ThresholdPick::Start { id, .. } => Some(*id),
315                    ThresholdPick::None => None,
316                });
317                for (name, pick) in picks {
318                    if let ThresholdPick::Start { id, .. } = pick {
319                        if let Some(state) = states.get(&name) {
320                            if let Some(frame) = state.frames.get(&id).cloned() {
321                                handle_define(&frame, &name, &store, &mut active).await;
322                            }
323                        }
324                    }
325                }
326            }
327            Lifecycle::Live(frame) => {
328                use crate::processor::lifecycle::Event;
329                let mut handled_as_lifecycle = false;
330                if let Some((name, ev)) = event_from_frame(&frame) {
331                    handled_as_lifecycle = true;
332                    let is_create = matches!(ev, Event::Create { .. });
333                    let is_term = matches!(ev, Event::Term);
334                    let state = states.entry(name.clone()).or_default();
335                    if let Event::Create { id } = &ev {
336                        state.frames.insert(*id, frame.clone());
337                    }
338                    state.slots.apply(ev);
339                    if is_create {
340                        handle_define(&frame, &name, &store, &mut active).await;
341                    } else if is_term {
342                        // User-driven undefine: drop the action and emit ack.
343                        if active.remove(&name).is_some() {
344                            let _ = store.append(
345                                Frame::builder(format!("xs.action.{name}.fin.term"))
346                                    .meta(serde_json::json!({
347                                        "frame_id": frame.id.to_string(),
348                                    }))
349                                    .build(),
350                            );
351                        }
352                    }
353                }
354                // Per-invocation `.call` lives in the user namespace; it's
355                // not a lifecycle event.
356                if !handled_as_lifecycle {
357                    if let Some(name) = frame.topic.strip_suffix(".call") {
358                        let name = name.to_owned();
359                        if let Some(action) = active.get(&name) {
360                            let store = store.clone();
361                            let frame = frame.clone();
362                            let action = action.clone();
363                            tokio::spawn(async move {
364                                if let Err(e) = execute_action(action, &frame, &store).await {
365                                    tracing::error!("Failed to execute action '{}': {:?}", name, e);
366                                    // Per-call runtime errors stay in the
367                                    // user namespace; lifecycle `.invalid` is
368                                    // reserved for init-time failures.
369                                    let _ = store.append(
370                                        Frame::builder(format!("{name}.error"))
371                                            .meta(serde_json::json!({
372                                                "error": e.to_string(),
373                                                "call_id": frame.id.to_string(),
374                                            }))
375                                            .build(),
376                                    );
377                                }
378                            });
379                        }
380                    }
381                }
382            }
383        }
384    }
385
386    Ok(())
387}