Skip to main content

xs/processor/action/
serve.rs

1use scru128::Scru128Id;
2use std::collections::HashMap;
3use tracing::instrument;
4
5use crate::error::Error;
6use crate::nu;
7use crate::nu::commands;
8use crate::nu::{value_to_json, ReturnOptions};
9use crate::processor::{Lifecycle, LifecycleReader};
10use crate::store::{FollowOption, Frame, ReadOptions, Store};
11
12#[derive(Clone)]
13struct Action {
14    id: Scru128Id,
15    engine: nu::Engine,
16    definition: String,
17    return_options: Option<ReturnOptions>,
18}
19
20async fn handle_define(
21    frame: &Frame,
22    name: &str,
23    store: &Store,
24    active: &mut HashMap<String, Action>,
25) {
26    match register_action(frame, store).await {
27        Ok(action) => {
28            active.insert(name.to_string(), action);
29            let _ = store.append(
30                Frame::builder(format!("{name}.ready"))
31                    .meta(serde_json::json!({
32                        "action_id": frame.id.to_string(),
33                    }))
34                    .build(),
35            );
36        }
37        Err(err) => {
38            let _ = store.append(
39                Frame::builder(format!("{name}.error"))
40                    .meta(serde_json::json!({
41                        "action_id": frame.id.to_string(),
42                        "error": err.to_string(),
43                    }))
44                    .build(),
45            );
46        }
47    }
48}
49
50async fn register_action(frame: &Frame, store: &Store) -> Result<Action, Error> {
51    // Get definition from CAS
52    let hash = frame.hash.as_ref().ok_or("Missing hash field")?;
53    let definition_bytes = store.cas_read(hash).await?;
54    let definition = String::from_utf8(definition_bytes)?;
55
56    // Build engine from scratch with VFS modules at this point in the stream
57    let mut engine = crate::processor::build_engine(store, &frame.id)?;
58
59    // Add streaming .cat and .last (actions get the streaming versions)
60    engine.add_commands(vec![
61        Box::new(commands::cat_stream_command::CatStreamCommand::new(
62            store.clone(),
63        )),
64        Box::new(commands::last_stream_command::LastStreamCommand::new(
65            store.clone(),
66        )),
67    ])?;
68
69    // Parse the action configuration
70    let nu_config = nu::parse_config(&mut engine, &definition)?;
71
72    // Deserialize action-specific options
73    #[derive(serde::Deserialize, Default)]
74    struct ActionOptions {
75        return_options: Option<ReturnOptions>,
76    }
77
78    let action_options: ActionOptions = nu_config.deserialize_options().unwrap_or_default();
79
80    Ok(Action {
81        id: frame.id,
82        engine,
83        definition,
84        return_options: action_options.return_options,
85    })
86}
87
88#[instrument(
89    level = "info",
90    skip(action, frame, store),
91    fields(
92        message = %format!(
93            "action={id} frame={frame_id}:{topic}",
94            id = action.id, frame_id = frame.id, topic = frame.topic
95        )
96    )
97)]
98async fn execute_action(action: Action, frame: &Frame, store: &Store) -> Result<(), Error> {
99    let store = store.clone();
100    let frame = frame.clone();
101
102    tokio::task::spawn_blocking(move || {
103        let base_meta = serde_json::json!({
104            "action_id": action.id.to_string(),
105            "frame_id": frame.id.to_string()
106        });
107
108        let mut engine = action.engine;
109
110        engine.add_commands(vec![Box::new(
111            commands::append_command::AppendCommand::new(store.clone(), base_meta),
112        )])?;
113
114        // Parse the action configuration to get the up-to-date closure with modules loaded
115        let nu_config = nu::parse_config(&mut engine, &action.definition)?;
116
117        // Run action and process pipeline
118        match run_action(&engine, nu_config.run_closure, &frame) {
119            Ok(pipeline_data) => {
120                let resp_suffix = action
121                    .return_options
122                    .as_ref()
123                    .and_then(|opts| opts.suffix.as_deref())
124                    .unwrap_or(".response");
125                let ttl = action
126                    .return_options
127                    .as_ref()
128                    .and_then(|opts| opts.ttl.clone());
129                let use_cas = action
130                    .return_options
131                    .as_ref()
132                    .and_then(|o| o.target.as_deref())
133                    .is_some_and(|t| t == "cas");
134
135                let topic = format!(
136                    "{topic}{suffix}",
137                    topic = frame.topic.strip_suffix(".call").unwrap(),
138                    suffix = resp_suffix
139                );
140
141                let mut base_meta = serde_json::json!({
142                    "action_id": action.id.to_string(),
143                    "frame_id": frame.id.to_string(),
144                });
145
146                if pipeline_data.is_nothing() {
147                    let _ = store.append(
148                        Frame::builder(topic)
149                            .maybe_ttl(ttl)
150                            .meta(base_meta)
151                            .build(),
152                    );
153                } else {
154                    let value = pipeline_data.into_value(nu_protocol::Span::unknown())?;
155                    if use_cas {
156                        let json_value = value_to_json(&value);
157                        let hash =
158                            store.cas_insert_sync(serde_json::to_string(&json_value)?)?;
159                        let _ = store.append(
160                            Frame::builder(topic)
161                                .maybe_ttl(ttl)
162                                .hash(hash)
163                                .meta(base_meta)
164                                .build(),
165                        );
166                    } else {
167                        match &value {
168                            nu_protocol::Value::Record { .. } => {
169                                let json = value_to_json(&value);
170                                if let serde_json::Value::Object(map) = json {
171                                    for (k, v) in map {
172                                        base_meta[k] = v;
173                                    }
174                                }
175                                let _ = store.append(
176                                    Frame::builder(topic)
177                                        .maybe_ttl(ttl)
178                                        .meta(base_meta)
179                                        .build(),
180                                );
181                            }
182                            _ => {
183                                return Err(format!(
184                                    "Action output must be a record when target is not \"cas\"; got {}. \
185                                     Set return_options.target to \"cas\" for non-record output.",
186                                    value.get_type()
187                                ).into());
188                            }
189                        }
190                    }
191                }
192
193                Ok(()) as Result<(), Box<dyn std::error::Error + Send + Sync>>
194            }
195            Err(err) => {
196                // Emit error event instead of propagating
197                let working_set = nu_protocol::engine::StateWorkingSet::new(&engine.state);
198                let _ = store.append(
199                    Frame::builder(format!(
200                        "{topic}.error",
201                        topic = frame.topic.strip_suffix(".call").unwrap()
202                    ))
203                    .meta(serde_json::json!({
204                        "action_id": action.id.to_string(),
205                        "frame_id": frame.id.to_string(),
206                        "error": nu_protocol::format_cli_error(None, &working_set, &*err, None)
207                    }))
208                    .build(),
209                );
210
211                Ok(()) as Result<(), Box<dyn std::error::Error + Send + Sync>>
212            }
213        }
214    })
215    .await??;
216
217    Ok(())
218}
219
220fn run_action(
221    engine: &nu::Engine,
222    closure: nu_protocol::engine::Closure,
223    frame: &Frame,
224) -> Result<nu_protocol::PipelineData, Box<nu_protocol::ShellError>> {
225    let arg_val = crate::nu::frame_to_value(frame, nu_protocol::Span::unknown(), false);
226
227    let mut engine_clone = engine.clone();
228    engine_clone.run_closure_in_job(
229        &closure,
230        vec![arg_val],
231        None,
232        format!("action {topic}", topic = frame.topic),
233    )
234}
235
236pub async fn run(store: Store) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
237    let rx = store
238        .read(ReadOptions::builder().follow(FollowOption::On).build())
239        .await;
240    let mut lifecycle = LifecycleReader::new(rx);
241    let mut compacted: HashMap<String, Frame> = HashMap::new();
242    let mut active: HashMap<String, Action> = HashMap::new();
243
244    while let Some(event) = lifecycle.recv().await {
245        match event {
246            Lifecycle::Historical(frame) => {
247                if let Some(name) = frame.topic.strip_suffix(".define") {
248                    compacted.insert(name.to_string(), frame);
249                }
250            }
251            Lifecycle::Threshold(_) => {
252                let mut ordered: Vec<_> = compacted.drain().collect();
253                ordered.sort_by_key(|(_, frame)| frame.id);
254
255                for (name, frame) in ordered {
256                    handle_define(&frame, &name, &store, &mut active).await;
257                }
258            }
259            Lifecycle::Live(frame) => {
260                if let Some(name) = frame.topic.strip_suffix(".define") {
261                    handle_define(&frame, name, &store, &mut active).await;
262                } else if let Some(name) = frame.topic.strip_suffix(".call") {
263                    let name = name.to_owned();
264                    if let Some(action) = active.get(&name) {
265                        let store = store.clone();
266                        let frame = frame.clone();
267                        let action = action.clone();
268                        tokio::spawn(async move {
269                            if let Err(e) = execute_action(action, &frame, &store).await {
270                                tracing::error!("Failed to execute action '{}': {:?}", name, e);
271                                let _ = store.append(
272                                    Frame::builder(format!("{name}.error"))
273                                        .meta(serde_json::json!({
274                                            "error": e.to_string(),
275                                        }))
276                                        .build(),
277                                );
278                            }
279                        });
280                    }
281                }
282            }
283        }
284    }
285
286    Ok(())
287}