1use scru128::Scru128Id;
2use std::collections::HashMap;
3use tracing::instrument;
4
5use crate::error::Error;
6use crate::nu;
7use crate::nu::{value_to_json, ReturnOptions};
8use crate::processor::{Lifecycle, LifecycleReader};
9use crate::store::{FollowOption, Frame, ReadOptions, Store};
10
11#[derive(Clone)]
12struct Action {
13 id: Scru128Id,
14 engine: nu::Engine,
15 definition: String,
16 return_options: Option<ReturnOptions>,
17}
18
19async fn handle_define(
20 frame: &Frame,
21 name: &str,
22 store: &Store,
23 active: &mut HashMap<String, Action>,
24) {
25 match register_action(frame, store).await {
26 Ok(action) => {
27 active.insert(name.to_string(), action);
28 let _ = store.append(
29 Frame::builder(format!("xs.action.{name}.active"))
30 .meta(serde_json::json!({
31 "action_id": frame.id.to_string(),
32 }))
33 .build(),
34 );
35 }
36 Err(err) => {
37 let _ = store.append(
39 Frame::builder(format!("xs.action.{name}.invalid"))
40 .meta(serde_json::json!({
41 "action_id": frame.id.to_string(),
42 "error": err.to_string(),
43 }))
44 .build(),
45 );
46 }
47 }
48}
49
50async fn register_action(frame: &Frame, store: &Store) -> Result<Action, Error> {
51 let hash = frame.hash.as_ref().ok_or("Missing hash field")?;
53 let definition_bytes = store.cas_read(hash).await?;
54 let definition = String::from_utf8(definition_bytes)?;
55
56 let mut engine = crate::processor::build_engine(store, &frame.id)?;
58
59 nu::add_read_commands(&mut engine, store, nu::ReadMode::Stream)?;
62
63 let nu_config = nu::parse_config(&mut engine, &definition)?;
65
66 #[derive(serde::Deserialize, Default)]
68 struct ActionOptions {
69 return_options: Option<ReturnOptions>,
70 }
71
72 let action_options: ActionOptions = nu_config.deserialize_options().unwrap_or_default();
73
74 Ok(Action {
75 id: frame.id,
76 engine,
77 definition,
78 return_options: action_options.return_options,
79 })
80}
81
82#[instrument(
83 level = "info",
84 skip(action, frame, store),
85 fields(
86 message = %format!(
87 "action={id} frame={frame_id}:{topic}",
88 id = action.id, frame_id = frame.id, topic = frame.topic
89 )
90 )
91)]
92async fn execute_action(action: Action, frame: &Frame, store: &Store) -> Result<(), Error> {
93 let store = store.clone();
94 let frame = frame.clone();
95
96 tokio::task::spawn_blocking(move || {
97 let base_meta = serde_json::json!({
98 "action_id": action.id.to_string(),
99 "frame_id": frame.id.to_string()
100 });
101
102 let mut engine = action.engine;
103
104 nu::add_write_commands(&mut engine, &store, nu::AppendMode::Direct(base_meta))?;
105
106 let nu_config = nu::parse_config(&mut engine, &action.definition)?;
108
109 match run_action(&engine, nu_config.run_closure, &frame) {
111 Ok(pipeline_data) => {
112 let resp_suffix = action
113 .return_options
114 .as_ref()
115 .and_then(|opts| opts.suffix.as_deref())
116 .unwrap_or(".response");
117 let ttl = action
118 .return_options
119 .as_ref()
120 .and_then(|opts| opts.ttl.clone());
121 let use_cas = action
122 .return_options
123 .as_ref()
124 .and_then(|o| o.target.as_deref())
125 .is_some_and(|t| t == "cas");
126
127 let topic = format!(
128 "{topic}{suffix}",
129 topic = frame.topic.strip_suffix(".call").unwrap(),
130 suffix = resp_suffix
131 );
132
133 let mut base_meta = serde_json::json!({
134 "action_id": action.id.to_string(),
135 "frame_id": frame.id.to_string(),
136 });
137
138 if pipeline_data.is_nothing() {
139 let _ = store.append(
140 Frame::builder(topic)
141 .maybe_ttl(ttl)
142 .meta(base_meta)
143 .build(),
144 );
145 } else {
146 let value = pipeline_data.into_value(nu_protocol::Span::unknown())?;
147 if use_cas {
148 let json_value = value_to_json(&value);
149 let hash =
150 store.cas_insert_sync(serde_json::to_string(&json_value)?)?;
151 let _ = store.append(
152 Frame::builder(topic)
153 .maybe_ttl(ttl)
154 .hash(hash)
155 .meta(base_meta)
156 .build(),
157 );
158 } else {
159 match &value {
160 nu_protocol::Value::Record { .. } => {
161 let json = value_to_json(&value);
162 if let serde_json::Value::Object(map) = json {
163 for (k, v) in map {
164 base_meta[k] = v;
165 }
166 }
167 let _ = store.append(
168 Frame::builder(topic)
169 .maybe_ttl(ttl)
170 .meta(base_meta)
171 .build(),
172 );
173 }
174 _ => {
175 return Err(format!(
176 "Action output must be a record when target is not \"cas\"; got {}. \
177 Set return_options.target to \"cas\" for non-record output.",
178 value.get_type()
179 ).into());
180 }
181 }
182 }
183 }
184
185 Ok(()) as Result<(), Box<dyn std::error::Error + Send + Sync>>
186 }
187 Err(err) => {
188 let working_set = nu_protocol::engine::StateWorkingSet::new(&engine.state);
190 let _ = store.append(
191 Frame::builder(format!(
192 "{topic}.error",
193 topic = frame.topic.strip_suffix(".call").unwrap()
194 ))
195 .meta(serde_json::json!({
196 "action_id": action.id.to_string(),
197 "frame_id": frame.id.to_string(),
198 "error": nu_protocol::format_cli_error(None, &working_set, &*err, None)
199 }))
200 .build(),
201 );
202
203 Ok(()) as Result<(), Box<dyn std::error::Error + Send + Sync>>
204 }
205 }
206 })
207 .await??;
208
209 Ok(())
210}
211
212fn run_action(
213 engine: &nu::Engine,
214 closure: nu_protocol::engine::Closure,
215 frame: &Frame,
216) -> Result<nu_protocol::PipelineData, Box<nu_protocol::ShellError>> {
217 let arg_val = crate::nu::frame_to_value(frame, nu_protocol::Span::unknown(), false);
218
219 let mut engine_clone = engine.clone();
220 engine_clone.run_closure_in_job(
221 &closure,
222 vec![arg_val],
223 None,
224 format!("action {topic}", topic = frame.topic),
225 )
226}
227
228fn event_from_frame(
230 frame: &crate::store::Frame,
231) -> Option<(String, crate::processor::lifecycle::Event)> {
232 use crate::processor::lifecycle::Event;
233 let rest = frame.topic.strip_prefix("xs.action.")?;
234 let (name, ev_tag) = split_action_event(rest)?;
235 let event = match ev_tag {
236 "create" => Event::Create { id: frame.id },
237 "term" => Event::Term,
238 "active" => Event::Active {
239 source: source_id(frame)?,
240 },
241 "invalid" => Event::Invalid {
242 source: source_id(frame)?,
243 },
244 "fin.term" | "fin.replaced" => Event::Fin,
245 "replaced" => Event::Replaced,
246 _ => return None,
247 };
248 Some((name.to_string(), event))
249}
250
251fn split_action_event(rest: &str) -> Option<(&str, &str)> {
252 for tag in ["fin.term", "fin.replaced"] {
253 if let Some(name) = rest.strip_suffix(&format!(".{tag}")) {
254 return Some((name, tag));
255 }
256 }
257 for tag in ["create", "term", "active", "invalid", "replaced"] {
258 if let Some(name) = rest.strip_suffix(&format!(".{tag}")) {
259 return Some((name, tag));
260 }
261 }
262 None
263}
264
265fn source_id(frame: &crate::store::Frame) -> Option<scru128::Scru128Id> {
266 use std::str::FromStr;
267 let meta = frame.meta.as_ref()?;
268 let s = meta.get("action_id").and_then(|v| v.as_str())?;
269 scru128::Scru128Id::from_str(s).ok()
270}
271
272#[derive(Default)]
273struct TopicState {
274 slots: crate::processor::lifecycle::Slots,
275 frames: HashMap<scru128::Scru128Id, crate::store::Frame>,
277}
278
279pub async fn run(store: Store) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
280 let rx = store
281 .read(ReadOptions::builder().follow(FollowOption::On).build())
282 .await;
283 let mut lifecycle = LifecycleReader::new(rx);
284 let mut states: HashMap<String, TopicState> = HashMap::new();
285 let mut active: HashMap<String, Action> = HashMap::new();
286
287 while let Some(event) = lifecycle.recv().await {
288 match event {
289 Lifecycle::Historical(frame) => {
290 if let Some((name, ev)) = event_from_frame(&frame) {
291 let state = states.entry(name).or_default();
292 if let crate::processor::lifecycle::Event::Create { id } = &ev {
293 state.frames.insert(*id, frame.clone());
294 }
295 state.slots.apply(ev);
296 }
297 }
298 Lifecycle::Threshold(_) => {
299 use crate::processor::lifecycle::ThresholdPick;
300 let mut picks: Vec<(String, ThresholdPick)> = states
301 .iter()
302 .map(|(t, s)| (t.clone(), s.slots.threshold()))
303 .collect();
304 picks.sort_by_key(|(_, p)| match p {
305 ThresholdPick::Start { id, .. } => Some(*id),
306 ThresholdPick::None => None,
307 });
308 for (name, pick) in picks {
309 if let ThresholdPick::Start { id, .. } = pick {
310 if let Some(state) = states.get(&name) {
311 if let Some(frame) = state.frames.get(&id).cloned() {
312 handle_define(&frame, &name, &store, &mut active).await;
313 }
314 }
315 }
316 }
317 }
318 Lifecycle::Live(frame) => {
319 use crate::processor::lifecycle::Event;
320 let mut handled_as_lifecycle = false;
321 if let Some((name, ev)) = event_from_frame(&frame) {
322 handled_as_lifecycle = true;
323 let is_create = matches!(ev, Event::Create { .. });
324 let is_term = matches!(ev, Event::Term);
325 let state = states.entry(name.clone()).or_default();
326 if let Event::Create { id } = &ev {
327 state.frames.insert(*id, frame.clone());
328 }
329 state.slots.apply(ev);
330 if is_create {
331 handle_define(&frame, &name, &store, &mut active).await;
332 } else if is_term {
333 if active.remove(&name).is_some() {
335 let _ = store.append(
336 Frame::builder(format!("xs.action.{name}.fin.term"))
337 .meta(serde_json::json!({
338 "frame_id": frame.id.to_string(),
339 }))
340 .build(),
341 );
342 }
343 }
344 }
345 if !handled_as_lifecycle {
348 if let Some(name) = frame.topic.strip_suffix(".call") {
349 let name = name.to_owned();
350 if let Some(action) = active.get(&name) {
351 let store = store.clone();
352 let frame = frame.clone();
353 let action = action.clone();
354 tokio::spawn(async move {
355 if let Err(e) = execute_action(action, &frame, &store).await {
356 tracing::error!("Failed to execute action '{}': {:?}", name, e);
357 let _ = store.append(
361 Frame::builder(format!("{name}.error"))
362 .meta(serde_json::json!({
363 "error": e.to_string(),
364 "call_id": frame.id.to_string(),
365 }))
366 .build(),
367 );
368 }
369 });
370 }
371 }
372 }
373 }
374 }
375 }
376
377 Ok(())
378}