1use std::str::FromStr;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5use tracing::instrument;
6
7use serde::{Deserialize, Serialize};
8
9use tokio::io::AsyncReadExt;
10
11use nu_protocol::Value;
12
13use scru128::Scru128Id;
14
15use crate::error::Error;
16use crate::nu;
17use crate::nu::commands;
18use crate::nu::value_to_json;
19use crate::nu::{NuScriptConfig, ReturnOptions};
20use crate::store::{FollowOption, Frame, ReadOptions, Store};
21
22#[derive(Clone)]
23pub struct Actor {
24 pub id: Scru128Id,
25 pub topic: String,
26 config: ActorConfig,
27 engine_worker: Arc<EngineWorker>,
28 output: Arc<Mutex<Vec<Frame>>>,
29}
30
31#[derive(Clone, Debug)]
32struct ActorConfig {
33 start: Start,
34 pulse: Option<u64>,
35 return_options: Option<ReturnOptions>,
36}
37
38#[derive(Clone, Debug, Default, Serialize, Deserialize)]
39#[serde(rename_all = "snake_case")]
40enum Start {
41 First,
42 #[default]
43 New,
44 After(Scru128Id),
45}
46
47#[derive(Deserialize, Debug, Default)]
49#[serde(default)] struct ActorScriptOptions {
51 start: Option<String>,
53 pulse: Option<u64>,
55 return_options: Option<ReturnOptions>,
57 initial: Option<serde_json::Value>,
59}
60
61pub(super) enum ClosureResult {
62 Continue {
63 output: Option<Value>,
64 next_state: Value,
65 },
66 Stop {
67 output: Option<Value>,
68 },
69}
70
71impl Actor {
72 pub async fn new(
73 id: Scru128Id,
74 topic: String,
75 mut engine: nu::Engine,
76 expression: String,
77 store: Store,
78 ) -> Result<Self, Error> {
79 let output = Arc::new(Mutex::new(Vec::new()));
80 engine.add_commands(vec![
81 Box::new(commands::cat_command::CatCommand::new(store.clone())),
82 Box::new(commands::last_command::LastCommand::new(store.clone())),
83 Box::new(commands::append_command_buffered::AppendCommand::new(
84 store.clone(),
85 output.clone(),
86 )),
87 ])?;
88
89 let nu_script_config = nu::parse_config(&mut engine, &expression)?;
91
92 let (actor_config, initial_json) = extract_actor_config(&nu_script_config)?;
94
95 let block = engine
97 .state
98 .get_block(nu_script_config.run_closure.block_id);
99 let num_required = block.signature.required_positional.len();
100 let num_optional = block.signature.optional_positional.len();
101
102 let total_positional = num_required + num_optional;
103 if total_positional != 2 {
104 return Err(format!(
105 "Closure must accept exactly 2 params (frame, state), got {total_positional}"
106 )
107 .into());
108 }
109
110 let span = nu_protocol::Span::unknown();
112 let initial_state = if let Some(json) = initial_json {
113 crate::nu::util::json_to_value(&json, span)
114 } else if num_optional > 0 {
115 let state_param = &block.signature.optional_positional[0];
116 state_param
117 .default_value
118 .clone()
119 .unwrap_or_else(|| Value::nothing(span))
120 } else {
121 Value::nothing(span)
122 };
123
124 let engine_worker = Arc::new(EngineWorker::new(
125 engine,
126 nu_script_config.run_closure,
127 initial_state,
128 ));
129
130 Ok(Self {
131 id,
132 topic,
133 config: actor_config,
134 engine_worker,
135 output,
136 })
137 }
138
139 async fn eval_in_thread(&self, frame: &Frame) -> Result<ClosureResult, Error> {
140 self.engine_worker.eval(frame.clone()).await
141 }
142
143 #[instrument(
144 level = "info",
145 skip(self, frame, store),
146 fields(
147 message = %format!(
148 "actor={actor_id}:{topic} frame={frame_id}:{frame_topic}",
149 actor_id = self.id, topic = self.topic, frame_id = frame.id, frame_topic = frame.topic)
150 )
151 )]
152 async fn process_frame(&mut self, frame: &Frame, store: &Store) -> Result<bool, Error> {
153 let result = self.eval_in_thread(frame).await?;
154
155 let (output, should_continue) = match result {
156 ClosureResult::Continue { output, .. } => (output, true),
157 ClosureResult::Stop { output } => (output, false),
158 };
159
160 let additional_frame = match output {
162 Some(ref value)
163 if !is_value_an_append_frame_from_actor(value, &self.id)
164 && !matches!(value, Value::Nothing { .. }) =>
165 {
166 let return_options = self.config.return_options.as_ref();
167 let suffix = return_options
168 .and_then(|ro| ro.suffix.as_deref())
169 .unwrap_or(".out");
170 let use_cas = return_options
171 .and_then(|ro| ro.target.as_deref())
172 .is_some_and(|t| t == "cas");
173
174 let topic = format!("{topic}{suffix}", topic = self.topic, suffix = suffix);
175 let ttl = return_options.and_then(|ro| ro.ttl.clone());
176
177 if use_cas {
178 let hash = match value {
179 Value::Binary { val, .. } => store.cas_insert(val).await?,
180 _ => store.cas_insert(&value_to_json(value).to_string()).await?,
181 };
182 Some(
183 Frame::builder(topic)
184 .maybe_ttl(ttl)
185 .maybe_hash(Some(hash))
186 .build(),
187 )
188 } else {
189 match value {
191 Value::Record { .. } => {
192 let json = value_to_json(value);
193 Some(Frame::builder(topic).maybe_ttl(ttl).meta(json).build())
194 }
195 _ => {
196 return Err(format!(
197 "Actor output must be a record when target is not \"cas\"; got {}. \
198 Set return_options.target to \"cas\" for non-record output.",
199 value.get_type()
200 )
201 .into());
202 }
203 }
204 }
205 }
206 _ => None,
207 };
208
209 let output_to_process: Vec<_> = {
211 let mut output = self.output.lock().unwrap();
212 output
213 .drain(..)
214 .chain(additional_frame.into_iter())
215 .collect()
216 };
217
218 for mut output_frame in output_to_process {
219 let meta_obj = output_frame
220 .meta
221 .get_or_insert_with(|| serde_json::Value::Object(Default::default()))
222 .as_object_mut()
223 .expect("meta should be an object");
224
225 meta_obj.insert(
226 "actor_id".to_string(),
227 serde_json::Value::String(self.id.to_string()),
228 );
229 meta_obj.insert(
230 "frame_id".to_string(),
231 serde_json::Value::String(frame.id.to_string()),
232 );
233
234 let _ = store.append(output_frame);
235 }
236
237 Ok(should_continue)
238 }
239
240 async fn serve(&mut self, store: &Store, options: ReadOptions) {
241 let mut recver = store.read(options).await;
242 let create_topic = format!("xs.actor.{}.create", self.topic);
243 let term_topic = format!("xs.actor.{}.term", self.topic);
244
245 while let Some(frame) = recver.recv().await {
246 if (frame.topic == create_topic || frame.topic == term_topic) && frame.id <= self.id {
248 continue;
249 }
250
251 if frame.topic == create_topic {
254 let _ = store.append(
255 Frame::builder(format!("xs.actor.{}.replaced", &self.topic))
256 .meta(serde_json::json!({
257 "actor_id": self.id.to_string(),
258 "frame_id": frame.id.to_string(),
259 }))
260 .build(),
261 );
262 break;
263 }
264
265 if frame.topic == term_topic {
267 let _ = store.append(
268 Frame::builder(format!("xs.actor.{}.fin.term", &self.topic))
269 .meta(serde_json::json!({
270 "actor_id": self.id.to_string(),
271 "frame_id": frame.id.to_string(),
272 }))
273 .build(),
274 );
275 break;
276 }
277
278 if frame
280 .meta
281 .as_ref()
282 .and_then(|meta| meta.get("actor_id"))
283 .and_then(|actor_id| actor_id.as_str())
284 .filter(|actor_id| *actor_id == self.id.to_string())
285 .is_some()
286 {
287 continue;
288 }
289
290 match self.process_frame(&frame, store).await {
291 Ok(true) => {}
292 Ok(false) => {
293 let _ = store.append(
295 Frame::builder(format!("xs.actor.{}.fin.ok", &self.topic))
296 .meta(serde_json::json!({
297 "actor_id": self.id.to_string(),
298 "frame_id": frame.id.to_string(),
299 }))
300 .build(),
301 );
302 break;
303 }
304 Err(err) => {
305 let _ = store.append(
307 Frame::builder(format!("xs.actor.{}.fin.error", &self.topic))
308 .meta(serde_json::json!({
309 "actor_id": self.id.to_string(),
310 "frame_id": frame.id.to_string(),
311 "error": err.to_string(),
312 }))
313 .build(),
314 );
315 break;
316 }
317 }
318 }
319 }
320
321 pub async fn spawn(&self, store: Store) -> Result<(), Error> {
322 let options = self.configure_read_options().await;
323
324 {
325 let store = store.clone();
326 let options = options.clone();
327 let mut actor = self.clone();
328
329 tokio::spawn(async move {
330 actor.serve(&store, options).await;
331 });
332 }
333
334 let _ = store.append(
335 Frame::builder(format!("xs.actor.{}.active", &self.topic))
336 .meta(serde_json::json!({
337 "actor_id": self.id.to_string(),
338 "start": self.config.start,
339 }))
340 .build(),
341 );
342
343 Ok(())
344 }
345
346 pub async fn from_frame(frame: &Frame, store: &Store) -> Result<Self, Error> {
347 let topic = frame
348 .topic
349 .strip_prefix("xs.actor.")
350 .and_then(|rest| rest.strip_suffix(".create"))
351 .ok_or("Frame topic must be xs.actor.<name>.create")?;
352
353 let hash = frame.hash.as_ref().ok_or("Missing hash field")?;
355 let mut reader = store
356 .cas_reader(hash.clone())
357 .await
358 .map_err(|e| format!("Failed to get cas reader: {e}"))?;
359
360 let mut expression = String::new();
361 reader
362 .read_to_string(&mut expression)
363 .await
364 .map_err(|e| format!("Failed to read expression: {e}"))?;
365
366 let engine = crate::processor::build_engine(store, &frame.id)?;
368
369 let actor = Actor::new(
370 frame.id,
371 topic.to_string(),
372 engine,
373 expression,
374 store.clone(),
375 )
376 .await?;
377
378 Ok(actor)
379 }
380
381 async fn configure_read_options(&self) -> ReadOptions {
382 let (after, is_new) = match &self.config.start {
384 Start::First => (None, false),
385 Start::New => (None, true),
386 Start::After(id) => (Some(*id), false),
387 };
388
389 let follow_option = self
391 .config
392 .pulse
393 .map(|pulse| FollowOption::WithHeartbeat(Duration::from_millis(pulse)))
394 .unwrap_or(FollowOption::On);
395
396 ReadOptions::builder()
397 .follow(follow_option)
398 .new(is_new)
399 .maybe_after(after)
400 .build()
401 }
402}
403
404use tokio::sync::{mpsc, oneshot};
405
406pub struct EngineWorker {
407 work_tx: mpsc::Sender<WorkItem>,
408}
409
410struct WorkItem {
411 frame: Frame,
412 resp_tx: oneshot::Sender<Result<ClosureResult, Error>>,
413}
414
415impl EngineWorker {
416 pub fn new(
417 engine: nu::Engine,
418 closure: nu_protocol::engine::Closure,
419 initial_state: Value,
420 ) -> Self {
421 let (work_tx, mut work_rx) = mpsc::channel(32);
422
423 std::thread::spawn(move || {
424 let mut engine = engine;
425 let mut state = initial_state;
426
427 while let Some(WorkItem { frame, resp_tx }) = work_rx.blocking_recv() {
428 let frame_val =
429 crate::nu::frame_to_value(&frame, nu_protocol::Span::unknown(), false);
430
431 let pipeline = engine.run_closure_in_job(
432 &closure,
433 vec![frame_val, state.clone()],
434 None,
435 format!("actor {topic}", topic = frame.topic),
436 );
437
438 let result = pipeline
439 .map_err(|e| {
440 let working_set = nu_protocol::engine::StateWorkingSet::new(&engine.state);
441 Error::from(nu_protocol::format_cli_error(None, &working_set, &*e, None))
442 })
443 .and_then(|pd| {
444 pd.into_value(nu_protocol::Span::unknown())
445 .map_err(Error::from)
446 })
447 .and_then(interpret_closure_result);
448
449 if let Ok(ClosureResult::Continue { ref next_state, .. }) = result {
450 state = next_state.clone();
451 }
452
453 let _ = resp_tx.send(result);
454 }
455 });
456
457 Self { work_tx }
458 }
459
460 pub async fn eval(&self, frame: Frame) -> Result<ClosureResult, Error> {
461 let (resp_tx, resp_rx) = oneshot::channel();
462 let work_item = WorkItem { frame, resp_tx };
463
464 self.work_tx
465 .send(work_item)
466 .await
467 .map_err(|_| Error::from("Engine worker thread has terminated"))?;
468
469 resp_rx
470 .await
471 .map_err(|_| Error::from("Engine worker thread has terminated"))?
472 }
473}
474
475fn interpret_closure_result(value: Value) -> Result<ClosureResult, Error> {
476 match value {
477 Value::Nothing { .. } => Ok(ClosureResult::Stop { output: None }),
478 Value::Record { ref val, .. } => {
479 for key in val.columns() {
480 if key != "out" && key != "next" {
481 return Err(format!(
482 "Unexpected key '{key}' in closure return record; only 'out' and 'next' are allowed"
483 )
484 .into());
485 }
486 }
487 let output = val.get("out").cloned();
488 match val.get("next").cloned() {
489 Some(next_state) => Ok(ClosureResult::Continue { output, next_state }),
490 None => Ok(ClosureResult::Stop { output }),
491 }
492 }
493 _ => Err(format!(
494 "Closure must return a record with 'out' and/or 'next' keys, or nothing; got {}",
495 value.get_type()
496 )
497 .into()),
498 }
499}
500
501fn is_value_an_append_frame_from_actor(value: &Value, actor_id: &Scru128Id) -> bool {
502 value
503 .as_record()
504 .ok()
505 .filter(|record| record.get("id").is_some() && record.get("topic").is_some())
506 .and_then(|record| record.get("meta"))
507 .and_then(|meta| meta.as_record().ok())
508 .and_then(|meta_record| meta_record.get("actor_id"))
509 .and_then(|id| id.as_str().ok())
510 .filter(|id| *id == actor_id.to_string())
511 .is_some()
512}
513
514fn extract_actor_config(
516 script_config: &NuScriptConfig,
517) -> Result<(ActorConfig, Option<serde_json::Value>), Error> {
518 let script_options: ActorScriptOptions = script_config.deserialize_options()?;
520
521 let start =
523 match script_options.start.as_deref() {
524 Some("first") => Start::First,
525 Some("new") => Start::New,
526 Some(id_str) => Start::After(Scru128Id::from_str(id_str).map_err(|_| -> Error {
527 format!("Invalid scru128 ID for start: {id_str}").into()
528 })?),
529 None => Start::default(), };
531
532 Ok((
534 ActorConfig {
535 start,
536 pulse: script_options.pulse,
537 return_options: script_options.return_options,
538 },
539 script_options.initial,
540 ))
541}