1use scru128::Scru128Id;
2use std::collections::HashMap;
3use tracing::instrument;
4
5use crate::error::Error;
6use crate::nu;
7use crate::nu::commands;
8use crate::nu::{value_to_json, ReturnOptions};
9use crate::processor::{Lifecycle, LifecycleReader};
10use crate::store::{FollowOption, Frame, ReadOptions, Store};
11
12#[derive(Clone)]
13struct Action {
14 id: Scru128Id,
15 engine: nu::Engine,
16 definition: String,
17 return_options: Option<ReturnOptions>,
18}
19
20async fn handle_define(
21 frame: &Frame,
22 name: &str,
23 store: &Store,
24 active: &mut HashMap<String, Action>,
25) {
26 match register_action(frame, store).await {
27 Ok(action) => {
28 active.insert(name.to_string(), action);
29 let _ = store.append(
30 Frame::builder(format!("xs.action.{name}.active"))
31 .meta(serde_json::json!({
32 "action_id": frame.id.to_string(),
33 }))
34 .build(),
35 );
36 }
37 Err(err) => {
38 let _ = store.append(
40 Frame::builder(format!("xs.action.{name}.invalid"))
41 .meta(serde_json::json!({
42 "action_id": frame.id.to_string(),
43 "error": err.to_string(),
44 }))
45 .build(),
46 );
47 }
48 }
49}
50
51async fn register_action(frame: &Frame, store: &Store) -> Result<Action, Error> {
52 let hash = frame.hash.as_ref().ok_or("Missing hash field")?;
54 let definition_bytes = store.cas_read(hash).await?;
55 let definition = String::from_utf8(definition_bytes)?;
56
57 let mut engine = crate::processor::build_engine(store, &frame.id)?;
59
60 engine.add_commands(vec![
62 Box::new(commands::cat_stream_command::CatStreamCommand::new(
63 store.clone(),
64 )),
65 Box::new(commands::last_stream_command::LastStreamCommand::new(
66 store.clone(),
67 )),
68 ])?;
69
70 let nu_config = nu::parse_config(&mut engine, &definition)?;
72
73 #[derive(serde::Deserialize, Default)]
75 struct ActionOptions {
76 return_options: Option<ReturnOptions>,
77 }
78
79 let action_options: ActionOptions = nu_config.deserialize_options().unwrap_or_default();
80
81 Ok(Action {
82 id: frame.id,
83 engine,
84 definition,
85 return_options: action_options.return_options,
86 })
87}
88
89#[instrument(
90 level = "info",
91 skip(action, frame, store),
92 fields(
93 message = %format!(
94 "action={id} frame={frame_id}:{topic}",
95 id = action.id, frame_id = frame.id, topic = frame.topic
96 )
97 )
98)]
99async fn execute_action(action: Action, frame: &Frame, store: &Store) -> Result<(), Error> {
100 let store = store.clone();
101 let frame = frame.clone();
102
103 tokio::task::spawn_blocking(move || {
104 let base_meta = serde_json::json!({
105 "action_id": action.id.to_string(),
106 "frame_id": frame.id.to_string()
107 });
108
109 let mut engine = action.engine;
110
111 engine.add_commands(vec![Box::new(
112 commands::append_command::AppendCommand::new(store.clone(), base_meta),
113 )])?;
114
115 let nu_config = nu::parse_config(&mut engine, &action.definition)?;
117
118 match run_action(&engine, nu_config.run_closure, &frame) {
120 Ok(pipeline_data) => {
121 let resp_suffix = action
122 .return_options
123 .as_ref()
124 .and_then(|opts| opts.suffix.as_deref())
125 .unwrap_or(".response");
126 let ttl = action
127 .return_options
128 .as_ref()
129 .and_then(|opts| opts.ttl.clone());
130 let use_cas = action
131 .return_options
132 .as_ref()
133 .and_then(|o| o.target.as_deref())
134 .is_some_and(|t| t == "cas");
135
136 let topic = format!(
137 "{topic}{suffix}",
138 topic = frame.topic.strip_suffix(".call").unwrap(),
139 suffix = resp_suffix
140 );
141
142 let mut base_meta = serde_json::json!({
143 "action_id": action.id.to_string(),
144 "frame_id": frame.id.to_string(),
145 });
146
147 if pipeline_data.is_nothing() {
148 let _ = store.append(
149 Frame::builder(topic)
150 .maybe_ttl(ttl)
151 .meta(base_meta)
152 .build(),
153 );
154 } else {
155 let value = pipeline_data.into_value(nu_protocol::Span::unknown())?;
156 if use_cas {
157 let json_value = value_to_json(&value);
158 let hash =
159 store.cas_insert_sync(serde_json::to_string(&json_value)?)?;
160 let _ = store.append(
161 Frame::builder(topic)
162 .maybe_ttl(ttl)
163 .hash(hash)
164 .meta(base_meta)
165 .build(),
166 );
167 } else {
168 match &value {
169 nu_protocol::Value::Record { .. } => {
170 let json = value_to_json(&value);
171 if let serde_json::Value::Object(map) = json {
172 for (k, v) in map {
173 base_meta[k] = v;
174 }
175 }
176 let _ = store.append(
177 Frame::builder(topic)
178 .maybe_ttl(ttl)
179 .meta(base_meta)
180 .build(),
181 );
182 }
183 _ => {
184 return Err(format!(
185 "Action output must be a record when target is not \"cas\"; got {}. \
186 Set return_options.target to \"cas\" for non-record output.",
187 value.get_type()
188 ).into());
189 }
190 }
191 }
192 }
193
194 Ok(()) as Result<(), Box<dyn std::error::Error + Send + Sync>>
195 }
196 Err(err) => {
197 let working_set = nu_protocol::engine::StateWorkingSet::new(&engine.state);
199 let _ = store.append(
200 Frame::builder(format!(
201 "{topic}.error",
202 topic = frame.topic.strip_suffix(".call").unwrap()
203 ))
204 .meta(serde_json::json!({
205 "action_id": action.id.to_string(),
206 "frame_id": frame.id.to_string(),
207 "error": nu_protocol::format_cli_error(None, &working_set, &*err, None)
208 }))
209 .build(),
210 );
211
212 Ok(()) as Result<(), Box<dyn std::error::Error + Send + Sync>>
213 }
214 }
215 })
216 .await??;
217
218 Ok(())
219}
220
221fn run_action(
222 engine: &nu::Engine,
223 closure: nu_protocol::engine::Closure,
224 frame: &Frame,
225) -> Result<nu_protocol::PipelineData, Box<nu_protocol::ShellError>> {
226 let arg_val = crate::nu::frame_to_value(frame, nu_protocol::Span::unknown(), false);
227
228 let mut engine_clone = engine.clone();
229 engine_clone.run_closure_in_job(
230 &closure,
231 vec![arg_val],
232 None,
233 format!("action {topic}", topic = frame.topic),
234 )
235}
236
237fn event_from_frame(
239 frame: &crate::store::Frame,
240) -> Option<(String, crate::processor::lifecycle::Event)> {
241 use crate::processor::lifecycle::Event;
242 let rest = frame.topic.strip_prefix("xs.action.")?;
243 let (name, ev_tag) = split_action_event(rest)?;
244 let event = match ev_tag {
245 "create" => Event::Create { id: frame.id },
246 "term" => Event::Term,
247 "active" => Event::Active {
248 source: source_id(frame)?,
249 },
250 "invalid" => Event::Invalid {
251 source: source_id(frame)?,
252 },
253 "fin.term" | "fin.replaced" => Event::Fin,
254 "replaced" => Event::Replaced,
255 _ => return None,
256 };
257 Some((name.to_string(), event))
258}
259
260fn split_action_event(rest: &str) -> Option<(&str, &str)> {
261 for tag in ["fin.term", "fin.replaced"] {
262 if let Some(name) = rest.strip_suffix(&format!(".{tag}")) {
263 return Some((name, tag));
264 }
265 }
266 for tag in ["create", "term", "active", "invalid", "replaced"] {
267 if let Some(name) = rest.strip_suffix(&format!(".{tag}")) {
268 return Some((name, tag));
269 }
270 }
271 None
272}
273
274fn source_id(frame: &crate::store::Frame) -> Option<scru128::Scru128Id> {
275 use std::str::FromStr;
276 let meta = frame.meta.as_ref()?;
277 let s = meta.get("action_id").and_then(|v| v.as_str())?;
278 scru128::Scru128Id::from_str(s).ok()
279}
280
281#[derive(Default)]
282struct TopicState {
283 slots: crate::processor::lifecycle::Slots,
284 frames: HashMap<scru128::Scru128Id, crate::store::Frame>,
286}
287
288pub async fn run(store: Store) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
289 let rx = store
290 .read(ReadOptions::builder().follow(FollowOption::On).build())
291 .await;
292 let mut lifecycle = LifecycleReader::new(rx);
293 let mut states: HashMap<String, TopicState> = HashMap::new();
294 let mut active: HashMap<String, Action> = HashMap::new();
295
296 while let Some(event) = lifecycle.recv().await {
297 match event {
298 Lifecycle::Historical(frame) => {
299 if let Some((name, ev)) = event_from_frame(&frame) {
300 let state = states.entry(name).or_default();
301 if let crate::processor::lifecycle::Event::Create { id } = &ev {
302 state.frames.insert(*id, frame.clone());
303 }
304 state.slots.apply(ev);
305 }
306 }
307 Lifecycle::Threshold(_) => {
308 use crate::processor::lifecycle::ThresholdPick;
309 let mut picks: Vec<(String, ThresholdPick)> = states
310 .iter()
311 .map(|(t, s)| (t.clone(), s.slots.threshold()))
312 .collect();
313 picks.sort_by_key(|(_, p)| match p {
314 ThresholdPick::Start { id, .. } => Some(*id),
315 ThresholdPick::None => None,
316 });
317 for (name, pick) in picks {
318 if let ThresholdPick::Start { id, .. } = pick {
319 if let Some(state) = states.get(&name) {
320 if let Some(frame) = state.frames.get(&id).cloned() {
321 handle_define(&frame, &name, &store, &mut active).await;
322 }
323 }
324 }
325 }
326 }
327 Lifecycle::Live(frame) => {
328 use crate::processor::lifecycle::Event;
329 let mut handled_as_lifecycle = false;
330 if let Some((name, ev)) = event_from_frame(&frame) {
331 handled_as_lifecycle = true;
332 let is_create = matches!(ev, Event::Create { .. });
333 let is_term = matches!(ev, Event::Term);
334 let state = states.entry(name.clone()).or_default();
335 if let Event::Create { id } = &ev {
336 state.frames.insert(*id, frame.clone());
337 }
338 state.slots.apply(ev);
339 if is_create {
340 handle_define(&frame, &name, &store, &mut active).await;
341 } else if is_term {
342 if active.remove(&name).is_some() {
344 let _ = store.append(
345 Frame::builder(format!("xs.action.{name}.fin.term"))
346 .meta(serde_json::json!({
347 "frame_id": frame.id.to_string(),
348 }))
349 .build(),
350 );
351 }
352 }
353 }
354 if !handled_as_lifecycle {
357 if let Some(name) = frame.topic.strip_suffix(".call") {
358 let name = name.to_owned();
359 if let Some(action) = active.get(&name) {
360 let store = store.clone();
361 let frame = frame.clone();
362 let action = action.clone();
363 tokio::spawn(async move {
364 if let Err(e) = execute_action(action, &frame, &store).await {
365 tracing::error!("Failed to execute action '{}': {:?}", name, e);
366 let _ = store.append(
370 Frame::builder(format!("{name}.error"))
371 .meta(serde_json::json!({
372 "error": e.to_string(),
373 "call_id": frame.id.to_string(),
374 }))
375 .build(),
376 );
377 }
378 });
379 }
380 }
381 }
382 }
383 }
384 }
385
386 Ok(())
387}