1use scru128::Scru128Id;
2use std::collections::HashMap;
3use tracing::instrument;
4
5use crate::error::Error;
6use crate::nu;
7use crate::nu::commands;
8use crate::nu::{value_to_json, ReturnOptions};
9use crate::processor::{Lifecycle, LifecycleReader};
10use crate::store::{FollowOption, Frame, ReadOptions, Store};
11
12#[derive(Clone)]
13struct Action {
14 id: Scru128Id,
15 engine: nu::Engine,
16 definition: String,
17 return_options: Option<ReturnOptions>,
18}
19
20async fn handle_define(
21 frame: &Frame,
22 name: &str,
23 store: &Store,
24 active: &mut HashMap<String, Action>,
25) {
26 match register_action(frame, store).await {
27 Ok(action) => {
28 active.insert(name.to_string(), action);
29 let _ = store.append(
30 Frame::builder(format!("{name}.ready"))
31 .meta(serde_json::json!({
32 "action_id": frame.id.to_string(),
33 }))
34 .build(),
35 );
36 }
37 Err(err) => {
38 let _ = store.append(
39 Frame::builder(format!("{name}.error"))
40 .meta(serde_json::json!({
41 "action_id": frame.id.to_string(),
42 "error": err.to_string(),
43 }))
44 .build(),
45 );
46 }
47 }
48}
49
50async fn register_action(frame: &Frame, store: &Store) -> Result<Action, Error> {
51 let hash = frame.hash.as_ref().ok_or("Missing hash field")?;
53 let definition_bytes = store.cas_read(hash).await?;
54 let definition = String::from_utf8(definition_bytes)?;
55
56 let mut engine = crate::processor::build_engine(store, &frame.id)?;
58
59 engine.add_commands(vec![
61 Box::new(commands::cat_stream_command::CatStreamCommand::new(
62 store.clone(),
63 )),
64 Box::new(commands::last_stream_command::LastStreamCommand::new(
65 store.clone(),
66 )),
67 ])?;
68
69 let nu_config = nu::parse_config(&mut engine, &definition)?;
71
72 #[derive(serde::Deserialize, Default)]
74 struct ActionOptions {
75 return_options: Option<ReturnOptions>,
76 }
77
78 let action_options: ActionOptions = nu_config.deserialize_options().unwrap_or_default();
79
80 Ok(Action {
81 id: frame.id,
82 engine,
83 definition,
84 return_options: action_options.return_options,
85 })
86}
87
88#[instrument(
89 level = "info",
90 skip(action, frame, store),
91 fields(
92 message = %format!(
93 "action={id} frame={frame_id}:{topic}",
94 id = action.id, frame_id = frame.id, topic = frame.topic
95 )
96 )
97)]
98async fn execute_action(action: Action, frame: &Frame, store: &Store) -> Result<(), Error> {
99 let store = store.clone();
100 let frame = frame.clone();
101
102 tokio::task::spawn_blocking(move || {
103 let base_meta = serde_json::json!({
104 "action_id": action.id.to_string(),
105 "frame_id": frame.id.to_string()
106 });
107
108 let mut engine = action.engine;
109
110 engine.add_commands(vec![Box::new(
111 commands::append_command::AppendCommand::new(store.clone(), base_meta),
112 )])?;
113
114 let nu_config = nu::parse_config(&mut engine, &action.definition)?;
116
117 match run_action(&engine, nu_config.run_closure, &frame) {
119 Ok(pipeline_data) => {
120 let resp_suffix = action
121 .return_options
122 .as_ref()
123 .and_then(|opts| opts.suffix.as_deref())
124 .unwrap_or(".response");
125 let ttl = action
126 .return_options
127 .as_ref()
128 .and_then(|opts| opts.ttl.clone());
129 let use_cas = action
130 .return_options
131 .as_ref()
132 .and_then(|o| o.target.as_deref())
133 .is_some_and(|t| t == "cas");
134
135 let topic = format!(
136 "{topic}{suffix}",
137 topic = frame.topic.strip_suffix(".call").unwrap(),
138 suffix = resp_suffix
139 );
140
141 let mut base_meta = serde_json::json!({
142 "action_id": action.id.to_string(),
143 "frame_id": frame.id.to_string(),
144 });
145
146 if pipeline_data.is_nothing() {
147 let _ = store.append(
148 Frame::builder(topic)
149 .maybe_ttl(ttl)
150 .meta(base_meta)
151 .build(),
152 );
153 } else {
154 let value = pipeline_data.into_value(nu_protocol::Span::unknown())?;
155 if use_cas {
156 let json_value = value_to_json(&value);
157 let hash =
158 store.cas_insert_sync(serde_json::to_string(&json_value)?)?;
159 let _ = store.append(
160 Frame::builder(topic)
161 .maybe_ttl(ttl)
162 .hash(hash)
163 .meta(base_meta)
164 .build(),
165 );
166 } else {
167 match &value {
168 nu_protocol::Value::Record { .. } => {
169 let json = value_to_json(&value);
170 if let serde_json::Value::Object(map) = json {
171 for (k, v) in map {
172 base_meta[k] = v;
173 }
174 }
175 let _ = store.append(
176 Frame::builder(topic)
177 .maybe_ttl(ttl)
178 .meta(base_meta)
179 .build(),
180 );
181 }
182 _ => {
183 return Err(format!(
184 "Action output must be a record when target is not \"cas\"; got {}. \
185 Set return_options.target to \"cas\" for non-record output.",
186 value.get_type()
187 ).into());
188 }
189 }
190 }
191 }
192
193 Ok(()) as Result<(), Box<dyn std::error::Error + Send + Sync>>
194 }
195 Err(err) => {
196 let working_set = nu_protocol::engine::StateWorkingSet::new(&engine.state);
198 let _ = store.append(
199 Frame::builder(format!(
200 "{topic}.error",
201 topic = frame.topic.strip_suffix(".call").unwrap()
202 ))
203 .meta(serde_json::json!({
204 "action_id": action.id.to_string(),
205 "frame_id": frame.id.to_string(),
206 "error": nu_protocol::format_cli_error(None, &working_set, &*err, None)
207 }))
208 .build(),
209 );
210
211 Ok(()) as Result<(), Box<dyn std::error::Error + Send + Sync>>
212 }
213 }
214 })
215 .await??;
216
217 Ok(())
218}
219
220fn run_action(
221 engine: &nu::Engine,
222 closure: nu_protocol::engine::Closure,
223 frame: &Frame,
224) -> Result<nu_protocol::PipelineData, Box<nu_protocol::ShellError>> {
225 let arg_val = crate::nu::frame_to_value(frame, nu_protocol::Span::unknown(), false);
226
227 let mut engine_clone = engine.clone();
228 engine_clone.run_closure_in_job(
229 &closure,
230 vec![arg_val],
231 None,
232 format!("action {topic}", topic = frame.topic),
233 )
234}
235
236pub async fn run(store: Store) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
237 let rx = store
238 .read(ReadOptions::builder().follow(FollowOption::On).build())
239 .await;
240 let mut lifecycle = LifecycleReader::new(rx);
241 let mut compacted: HashMap<String, Frame> = HashMap::new();
242 let mut active: HashMap<String, Action> = HashMap::new();
243
244 while let Some(event) = lifecycle.recv().await {
245 match event {
246 Lifecycle::Historical(frame) => {
247 if let Some(name) = frame.topic.strip_suffix(".define") {
248 compacted.insert(name.to_string(), frame);
249 }
250 }
251 Lifecycle::Threshold(_) => {
252 let mut ordered: Vec<_> = compacted.drain().collect();
253 ordered.sort_by_key(|(_, frame)| frame.id);
254
255 for (name, frame) in ordered {
256 handle_define(&frame, &name, &store, &mut active).await;
257 }
258 }
259 Lifecycle::Live(frame) => {
260 if let Some(name) = frame.topic.strip_suffix(".define") {
261 handle_define(&frame, name, &store, &mut active).await;
262 } else if let Some(name) = frame.topic.strip_suffix(".call") {
263 let name = name.to_owned();
264 if let Some(action) = active.get(&name) {
265 let store = store.clone();
266 let frame = frame.clone();
267 let action = action.clone();
268 tokio::spawn(async move {
269 if let Err(e) = execute_action(action, &frame, &store).await {
270 tracing::error!("Failed to execute action '{}': {:?}", name, e);
271 let _ = store.append(
272 Frame::builder(format!("{name}.error"))
273 .meta(serde_json::json!({
274 "error": e.to_string(),
275 }))
276 .build(),
277 );
278 }
279 });
280 }
281 }
282 }
283 }
284 }
285
286 Ok(())
287}