1use scru128::Scru128Id;
2use std::collections::HashMap;
3use tracing::instrument;
4
5use crate::error::Error;
6use crate::nu;
7use crate::nu::{value_to_json, ReturnOptions};
8use crate::processor::{Lifecycle, LifecycleReader};
9use crate::store::{FollowOption, Frame, ReadOptions, Store};
10
11#[derive(Clone)]
12struct Action {
13 id: Scru128Id,
14 engine: nu::Engine,
15 definition: String,
16 return_options: Option<ReturnOptions>,
17}
18
19async fn handle_define(
20 frame: &Frame,
21 name: &str,
22 store: &Store,
23 active: &mut HashMap<String, Action>,
24) {
25 match register_action(frame, store).await {
26 Ok(action) => {
27 active.insert(name.to_string(), action);
28 let _ = store.append(
29 Frame::builder(format!("xs.action.{name}.active"))
30 .meta(serde_json::json!({
31 "action_id": frame.id.to_string(),
32 }))
33 .build(),
34 );
35 }
36 Err(err) => {
37 let _ = store.append(
39 Frame::builder(format!("xs.action.{name}.invalid"))
40 .meta(serde_json::json!({
41 "action_id": frame.id.to_string(),
42 "error": err.to_string(),
43 }))
44 .build(),
45 );
46 }
47 }
48}
49
50async fn register_action(frame: &Frame, store: &Store) -> Result<Action, Error> {
51 let hash = frame.hash.as_ref().ok_or("Missing hash field")?;
53 let definition_bytes = store.cas_read(hash).await?;
54 let definition = String::from_utf8(definition_bytes)?;
55
56 let mut engine = nu::prepared_base(store, nu::ReadMode::Stream, true)?;
60 let modules = store.nu_modules_at(&frame.id);
61 nu::load_modules(&mut engine.state, store, &modules)?;
62
63 let nu_config = nu::parse_config(&mut engine, &definition)?;
65
66 #[derive(serde::Deserialize, Default)]
68 struct ActionOptions {
69 return_options: Option<ReturnOptions>,
70 }
71
72 let action_options: ActionOptions = nu_config.deserialize_options().unwrap_or_default();
73
74 Ok(Action {
75 id: frame.id,
76 engine,
77 definition,
78 return_options: action_options.return_options,
79 })
80}
81
82#[instrument(
83 level = "info",
84 skip(action, frame, store),
85 fields(
86 message = %format!(
87 "action={id} frame={frame_id}:{topic}",
88 id = action.id, frame_id = frame.id, topic = frame.topic
89 )
90 )
91)]
92async fn execute_action(action: Action, frame: &Frame, store: &Store) -> Result<(), Error> {
93 let store = store.clone();
94 let frame = frame.clone();
95
96 tokio::task::spawn_blocking(move || {
97 let mut engine = action.engine;
98
99 engine.set_append_meta(&serde_json::json!({
101 "action_id": action.id.to_string(),
102 "frame_id": frame.id.to_string()
103 }));
104
105 let nu_config = nu::parse_config(&mut engine, &action.definition)?;
107
108 match run_action(&engine, nu_config.run_closure, &frame) {
110 Ok(pipeline_data) => {
111 let resp_suffix = action
112 .return_options
113 .as_ref()
114 .and_then(|opts| opts.suffix.as_deref())
115 .unwrap_or(".response");
116 let ttl = action
117 .return_options
118 .as_ref()
119 .and_then(|opts| opts.ttl.clone());
120 let use_cas = action
121 .return_options
122 .as_ref()
123 .and_then(|o| o.target.as_deref())
124 .is_some_and(|t| t == "cas");
125
126 let topic = format!(
127 "{topic}{suffix}",
128 topic = frame.topic.strip_suffix(".call").unwrap(),
129 suffix = resp_suffix
130 );
131
132 let mut base_meta = serde_json::json!({
133 "action_id": action.id.to_string(),
134 "frame_id": frame.id.to_string(),
135 });
136
137 if pipeline_data.is_nothing() {
138 let _ = store.append(
139 Frame::builder(topic)
140 .maybe_ttl(ttl)
141 .meta(base_meta)
142 .build(),
143 );
144 } else {
145 let value = pipeline_data.into_value(nu_protocol::Span::unknown())?;
146 if use_cas {
147 let json_value = value_to_json(&value);
148 let hash =
149 store.cas_insert_sync(serde_json::to_string(&json_value)?)?;
150 let _ = store.append(
151 Frame::builder(topic)
152 .maybe_ttl(ttl)
153 .hash(hash)
154 .meta(base_meta)
155 .build(),
156 );
157 } else {
158 match &value {
159 nu_protocol::Value::Record { .. } => {
160 let json = value_to_json(&value);
161 if let serde_json::Value::Object(map) = json {
162 for (k, v) in map {
163 base_meta[k] = v;
164 }
165 }
166 let _ = store.append(
167 Frame::builder(topic)
168 .maybe_ttl(ttl)
169 .meta(base_meta)
170 .build(),
171 );
172 }
173 _ => {
174 return Err(format!(
175 "Action output must be a record when target is not \"cas\"; got {}. \
176 Set return_options.target to \"cas\" for non-record output.",
177 value.get_type()
178 ).into());
179 }
180 }
181 }
182 }
183
184 Ok(()) as Result<(), Box<dyn std::error::Error + Send + Sync>>
185 }
186 Err(err) => {
187 let working_set = nu_protocol::engine::StateWorkingSet::new(&engine.state);
189 let _ = store.append(
190 Frame::builder(format!(
191 "{topic}.error",
192 topic = frame.topic.strip_suffix(".call").unwrap()
193 ))
194 .meta(serde_json::json!({
195 "action_id": action.id.to_string(),
196 "frame_id": frame.id.to_string(),
197 "error": nu_protocol::format_cli_error(None, &working_set, &*err, None)
198 }))
199 .build(),
200 );
201
202 Ok(()) as Result<(), Box<dyn std::error::Error + Send + Sync>>
203 }
204 }
205 })
206 .await??;
207
208 Ok(())
209}
210
211fn run_action(
212 engine: &nu::Engine,
213 closure: nu_protocol::engine::Closure,
214 frame: &Frame,
215) -> Result<nu_protocol::PipelineData, Box<nu_protocol::ShellError>> {
216 let arg_val = crate::nu::frame_to_value(frame, nu_protocol::Span::unknown(), false);
217
218 let mut engine_clone = engine.clone();
219 engine_clone.run_closure_in_job(
220 &closure,
221 vec![arg_val],
222 None,
223 format!("action {topic}", topic = frame.topic),
224 )
225}
226
227fn event_from_frame(
229 frame: &crate::store::Frame,
230) -> Option<(String, crate::processor::lifecycle::Event)> {
231 use crate::processor::lifecycle::Event;
232 let rest = frame.topic.strip_prefix("xs.action.")?;
233 let (name, ev_tag) = split_action_event(rest)?;
234 let event = match ev_tag {
235 "create" => Event::Create { id: frame.id },
236 "term" => Event::Term,
237 "active" => Event::Active {
238 source: source_id(frame)?,
239 },
240 "invalid" => Event::Invalid {
241 source: source_id(frame)?,
242 },
243 "fin.term" | "fin.replaced" => Event::Fin,
244 "replaced" => Event::Replaced,
245 _ => return None,
246 };
247 Some((name.to_string(), event))
248}
249
250fn split_action_event(rest: &str) -> Option<(&str, &str)> {
251 for tag in ["fin.term", "fin.replaced"] {
252 if let Some(name) = rest.strip_suffix(&format!(".{tag}")) {
253 return Some((name, tag));
254 }
255 }
256 for tag in ["create", "term", "active", "invalid", "replaced"] {
257 if let Some(name) = rest.strip_suffix(&format!(".{tag}")) {
258 return Some((name, tag));
259 }
260 }
261 None
262}
263
264fn source_id(frame: &crate::store::Frame) -> Option<scru128::Scru128Id> {
265 use std::str::FromStr;
266 let meta = frame.meta.as_ref()?;
267 let s = meta.get("action_id").and_then(|v| v.as_str())?;
268 scru128::Scru128Id::from_str(s).ok()
269}
270
271#[derive(Default)]
272struct TopicState {
273 slots: crate::processor::lifecycle::Slots,
274 frames: HashMap<scru128::Scru128Id, crate::store::Frame>,
276}
277
278pub async fn run(store: Store) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
279 let rx = store
280 .read(ReadOptions::builder().follow(FollowOption::On).build())
281 .await;
282 let mut lifecycle = LifecycleReader::new(rx);
283 let mut states: HashMap<String, TopicState> = HashMap::new();
284 let mut active: HashMap<String, Action> = HashMap::new();
285
286 while let Some(event) = lifecycle.recv().await {
287 match event {
288 Lifecycle::Historical(frame) => {
289 if let Some((name, ev)) = event_from_frame(&frame) {
290 let state = states.entry(name).or_default();
291 if let crate::processor::lifecycle::Event::Create { id } = &ev {
292 state.frames.insert(*id, frame.clone());
293 }
294 state.slots.apply(ev);
295 }
296 }
297 Lifecycle::Threshold(_) => {
298 use crate::processor::lifecycle::ThresholdPick;
299 let mut picks: Vec<(String, ThresholdPick)> = states
300 .iter()
301 .map(|(t, s)| (t.clone(), s.slots.threshold()))
302 .collect();
303 picks.sort_by_key(|(_, p)| match p {
304 ThresholdPick::Start { id, .. } => Some(*id),
305 ThresholdPick::None => None,
306 });
307 for (name, pick) in picks {
308 if let ThresholdPick::Start { id, .. } = pick {
309 if let Some(state) = states.get(&name) {
310 if let Some(frame) = state.frames.get(&id).cloned() {
311 handle_define(&frame, &name, &store, &mut active).await;
312 }
313 }
314 }
315 }
316 }
317 Lifecycle::Live(frame) => {
318 use crate::processor::lifecycle::Event;
319 let mut handled_as_lifecycle = false;
320 if let Some((name, ev)) = event_from_frame(&frame) {
321 handled_as_lifecycle = true;
322 let is_create = matches!(ev, Event::Create { .. });
323 let is_term = matches!(ev, Event::Term);
324 let state = states.entry(name.clone()).or_default();
325 if let Event::Create { id } = &ev {
326 state.frames.insert(*id, frame.clone());
327 }
328 state.slots.apply(ev);
329 if is_create {
330 handle_define(&frame, &name, &store, &mut active).await;
331 } else if is_term {
332 if active.remove(&name).is_some() {
334 let _ = store.append(
335 Frame::builder(format!("xs.action.{name}.fin.term"))
336 .meta(serde_json::json!({
337 "frame_id": frame.id.to_string(),
338 }))
339 .build(),
340 );
341 }
342 }
343 }
344 if !handled_as_lifecycle {
347 if let Some(name) = frame.topic.strip_suffix(".call") {
348 let name = name.to_owned();
349 if let Some(action) = active.get(&name) {
350 let store = store.clone();
351 let frame = frame.clone();
352 let action = action.clone();
353 tokio::spawn(async move {
354 if let Err(e) = execute_action(action, &frame, &store).await {
355 tracing::error!("Failed to execute action '{}': {:?}", name, e);
356 let _ = store.append(
360 Frame::builder(format!("{name}.error"))
361 .meta(serde_json::json!({
362 "error": e.to_string(),
363 "call_id": frame.id.to_string(),
364 }))
365 .build(),
366 );
367 }
368 });
369 }
370 }
371 }
372 }
373 }
374 }
375
376 Ok(())
377}