1use std::str::FromStr;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5use tracing::instrument;
6
7use serde::{Deserialize, Serialize};
8
9use tokio::io::AsyncReadExt;
10
11use nu_protocol::Value;
12
13use scru128::Scru128Id;
14
15use crate::error::Error;
16use crate::nu;
17use crate::nu::value_to_json;
18use crate::nu::{NuScriptConfig, ReturnOptions};
19use crate::store::{FollowOption, Frame, ReadOptions, Store};
20
21#[derive(Clone)]
22pub struct Actor {
23 pub id: Scru128Id,
24 pub topic: String,
25 config: ActorConfig,
26 engine_worker: Arc<EngineWorker>,
27 output: Arc<Mutex<Vec<Frame>>>,
28}
29
30#[derive(Clone, Debug)]
31struct ActorConfig {
32 start: Start,
33 pulse: Option<u64>,
34 return_options: Option<ReturnOptions>,
35}
36
37#[derive(Clone, Debug, Default, Serialize, Deserialize)]
38#[serde(rename_all = "snake_case")]
39enum Start {
40 First,
41 #[default]
42 New,
43 After(Scru128Id),
44}
45
46#[derive(Deserialize, Debug, Default)]
48#[serde(default)] struct ActorScriptOptions {
50 start: Option<String>,
52 pulse: Option<u64>,
54 return_options: Option<ReturnOptions>,
56 initial: Option<serde_json::Value>,
58}
59
60pub(super) enum ClosureResult {
61 Continue {
62 output: Option<Value>,
63 next_state: Value,
64 },
65 Stop {
66 output: Option<Value>,
67 },
68}
69
70impl Actor {
71 pub async fn new(
72 id: Scru128Id,
73 topic: String,
74 mut engine: nu::Engine,
75 expression: String,
76 store: Store,
77 ) -> Result<Self, Error> {
78 let output = Arc::new(Mutex::new(Vec::new()));
79 nu::add_write_commands(
82 &mut engine,
83 &store,
84 nu::AppendMode::Buffered(output.clone()),
85 )?;
86
87 let nu_script_config = nu::parse_config(&mut engine, &expression)?;
89
90 let (actor_config, initial_json) = extract_actor_config(&nu_script_config)?;
92
93 let block = engine
95 .state
96 .get_block(nu_script_config.run_closure.block_id);
97 let num_required = block.signature.required_positional.len();
98 let num_optional = block.signature.optional_positional.len();
99
100 let total_positional = num_required + num_optional;
101 if total_positional != 2 {
102 return Err(format!(
103 "Closure must accept exactly 2 params (frame, state), got {total_positional}"
104 )
105 .into());
106 }
107
108 let span = nu_protocol::Span::unknown();
110 let initial_state = if let Some(json) = initial_json {
111 crate::nu::util::json_to_value(&json, span)
112 } else if num_optional > 0 {
113 let state_param = &block.signature.optional_positional[0];
114 state_param
115 .default_value
116 .clone()
117 .unwrap_or_else(|| Value::nothing(span))
118 } else {
119 Value::nothing(span)
120 };
121
122 let engine_worker = Arc::new(EngineWorker::new(
123 engine,
124 nu_script_config.run_closure,
125 initial_state,
126 ));
127
128 Ok(Self {
129 id,
130 topic,
131 config: actor_config,
132 engine_worker,
133 output,
134 })
135 }
136
137 async fn eval_in_thread(&self, frame: &Frame) -> Result<ClosureResult, Error> {
138 self.engine_worker.eval(frame.clone()).await
139 }
140
141 #[instrument(
142 level = "info",
143 skip(self, frame, store),
144 fields(
145 message = %format!(
146 "actor={actor_id}:{topic} frame={frame_id}:{frame_topic}",
147 actor_id = self.id, topic = self.topic, frame_id = frame.id, frame_topic = frame.topic)
148 )
149 )]
150 async fn process_frame(&mut self, frame: &Frame, store: &Store) -> Result<bool, Error> {
151 let result = self.eval_in_thread(frame).await?;
152
153 let (output, should_continue) = match result {
154 ClosureResult::Continue { output, .. } => (output, true),
155 ClosureResult::Stop { output } => (output, false),
156 };
157
158 let additional_frame = match output {
160 Some(ref value)
161 if !is_value_an_append_frame_from_actor(value, &self.id)
162 && !matches!(value, Value::Nothing { .. }) =>
163 {
164 let return_options = self.config.return_options.as_ref();
165 let suffix = return_options
166 .and_then(|ro| ro.suffix.as_deref())
167 .unwrap_or(".out");
168 let use_cas = return_options
169 .and_then(|ro| ro.target.as_deref())
170 .is_some_and(|t| t == "cas");
171
172 let topic = format!("{topic}{suffix}", topic = self.topic, suffix = suffix);
173 let ttl = return_options.and_then(|ro| ro.ttl.clone());
174
175 if use_cas {
176 let hash = match value {
177 Value::Binary { val, .. } => store.cas_insert(val).await?,
178 _ => store.cas_insert(&value_to_json(value).to_string()).await?,
179 };
180 Some(
181 Frame::builder(topic)
182 .maybe_ttl(ttl)
183 .maybe_hash(Some(hash))
184 .build(),
185 )
186 } else {
187 match value {
189 Value::Record { .. } => {
190 let json = value_to_json(value);
191 Some(Frame::builder(topic).maybe_ttl(ttl).meta(json).build())
192 }
193 _ => {
194 return Err(format!(
195 "Actor output must be a record when target is not \"cas\"; got {}. \
196 Set return_options.target to \"cas\" for non-record output.",
197 value.get_type()
198 )
199 .into());
200 }
201 }
202 }
203 }
204 _ => None,
205 };
206
207 let output_to_process: Vec<_> = {
209 let mut output = self.output.lock().unwrap();
210 output
211 .drain(..)
212 .chain(additional_frame.into_iter())
213 .collect()
214 };
215
216 for mut output_frame in output_to_process {
217 let meta_obj = output_frame
218 .meta
219 .get_or_insert_with(|| serde_json::Value::Object(Default::default()))
220 .as_object_mut()
221 .expect("meta should be an object");
222
223 meta_obj.insert(
224 "actor_id".to_string(),
225 serde_json::Value::String(self.id.to_string()),
226 );
227 meta_obj.insert(
228 "frame_id".to_string(),
229 serde_json::Value::String(frame.id.to_string()),
230 );
231
232 let _ = store.append(output_frame);
233 }
234
235 Ok(should_continue)
236 }
237
238 async fn serve(&mut self, store: &Store, options: ReadOptions) {
239 let mut recver = store.read(options).await;
240 let create_topic = format!("xs.actor.{}.create", self.topic);
241 let term_topic = format!("xs.actor.{}.term", self.topic);
242
243 while let Some(frame) = recver.recv().await {
244 if (frame.topic == create_topic || frame.topic == term_topic) && frame.id <= self.id {
246 continue;
247 }
248
249 if frame.topic == create_topic {
252 let _ = store.append(
253 Frame::builder(format!("xs.actor.{}.replaced", &self.topic))
254 .meta(serde_json::json!({
255 "actor_id": self.id.to_string(),
256 "frame_id": frame.id.to_string(),
257 }))
258 .build(),
259 );
260 break;
261 }
262
263 if frame.topic == term_topic {
265 let _ = store.append(
266 Frame::builder(format!("xs.actor.{}.fin.term", &self.topic))
267 .meta(serde_json::json!({
268 "actor_id": self.id.to_string(),
269 "frame_id": frame.id.to_string(),
270 }))
271 .build(),
272 );
273 break;
274 }
275
276 if frame
278 .meta
279 .as_ref()
280 .and_then(|meta| meta.get("actor_id"))
281 .and_then(|actor_id| actor_id.as_str())
282 .filter(|actor_id| *actor_id == self.id.to_string())
283 .is_some()
284 {
285 continue;
286 }
287
288 match self.process_frame(&frame, store).await {
289 Ok(true) => {}
290 Ok(false) => {
291 let _ = store.append(
293 Frame::builder(format!("xs.actor.{}.fin.ok", &self.topic))
294 .meta(serde_json::json!({
295 "actor_id": self.id.to_string(),
296 "frame_id": frame.id.to_string(),
297 }))
298 .build(),
299 );
300 break;
301 }
302 Err(err) => {
303 let _ = store.append(
305 Frame::builder(format!("xs.actor.{}.fin.error", &self.topic))
306 .meta(serde_json::json!({
307 "actor_id": self.id.to_string(),
308 "frame_id": frame.id.to_string(),
309 "error": err.to_string(),
310 }))
311 .build(),
312 );
313 break;
314 }
315 }
316 }
317 }
318
319 pub async fn spawn(&self, store: Store) -> Result<(), Error> {
320 let options = self.configure_read_options().await;
321
322 {
323 let store = store.clone();
324 let options = options.clone();
325 let mut actor = self.clone();
326
327 tokio::spawn(async move {
328 actor.serve(&store, options).await;
329 });
330 }
331
332 let _ = store.append(
333 Frame::builder(format!("xs.actor.{}.active", &self.topic))
334 .meta(serde_json::json!({
335 "actor_id": self.id.to_string(),
336 "start": self.config.start,
337 }))
338 .build(),
339 );
340
341 Ok(())
342 }
343
344 pub async fn from_frame(frame: &Frame, store: &Store) -> Result<Self, Error> {
345 let topic = frame
346 .topic
347 .strip_prefix("xs.actor.")
348 .and_then(|rest| rest.strip_suffix(".create"))
349 .ok_or("Frame topic must be xs.actor.<name>.create")?;
350
351 let hash = frame.hash.as_ref().ok_or("Missing hash field")?;
353 let mut reader = store
354 .cas_reader(hash.clone())
355 .await
356 .map_err(|e| format!("Failed to get cas reader: {e}"))?;
357
358 let mut expression = String::new();
359 reader
360 .read_to_string(&mut expression)
361 .await
362 .map_err(|e| format!("Failed to read expression: {e}"))?;
363
364 let mut engine = nu::prepared_base(store, nu::ReadMode::Plain, false)?;
367 let modules = store.nu_modules_at(&frame.id);
368 nu::load_modules(&mut engine.state, store, &modules)?;
369
370 let actor = Actor::new(
371 frame.id,
372 topic.to_string(),
373 engine,
374 expression,
375 store.clone(),
376 )
377 .await?;
378
379 Ok(actor)
380 }
381
382 async fn configure_read_options(&self) -> ReadOptions {
383 let (after, is_new) = match &self.config.start {
385 Start::First => (None, false),
386 Start::New => (None, true),
387 Start::After(id) => (Some(*id), false),
388 };
389
390 let follow_option = self
392 .config
393 .pulse
394 .map(|pulse| FollowOption::WithHeartbeat(Duration::from_millis(pulse)))
395 .unwrap_or(FollowOption::On);
396
397 ReadOptions::builder()
398 .follow(follow_option)
399 .new(is_new)
400 .maybe_after(after)
401 .build()
402 }
403}
404
405use tokio::sync::{mpsc, oneshot};
406
407pub struct EngineWorker {
408 work_tx: mpsc::Sender<WorkItem>,
409}
410
411struct WorkItem {
412 frame: Frame,
413 resp_tx: oneshot::Sender<Result<ClosureResult, Error>>,
414}
415
416impl EngineWorker {
417 pub fn new(
418 engine: nu::Engine,
419 closure: nu_protocol::engine::Closure,
420 initial_state: Value,
421 ) -> Self {
422 let (work_tx, mut work_rx) = mpsc::channel(32);
423
424 std::thread::spawn(move || {
425 let mut engine = engine;
426 let mut state = initial_state;
427
428 while let Some(WorkItem { frame, resp_tx }) = work_rx.blocking_recv() {
429 let frame_val =
430 crate::nu::frame_to_value(&frame, nu_protocol::Span::unknown(), false);
431
432 let pipeline = engine.run_closure_in_job(
433 &closure,
434 vec![frame_val, state.clone()],
435 None,
436 format!("actor {topic}", topic = frame.topic),
437 );
438
439 let result = pipeline
440 .map_err(|e| {
441 let working_set = nu_protocol::engine::StateWorkingSet::new(&engine.state);
442 Error::from(nu_protocol::format_cli_error(None, &working_set, &*e, None))
443 })
444 .and_then(|pd| {
445 pd.into_value(nu_protocol::Span::unknown())
446 .map_err(Error::from)
447 })
448 .and_then(interpret_closure_result);
449
450 if let Ok(ClosureResult::Continue { ref next_state, .. }) = result {
451 state = next_state.clone();
452 }
453
454 let _ = resp_tx.send(result);
455 }
456 });
457
458 Self { work_tx }
459 }
460
461 pub async fn eval(&self, frame: Frame) -> Result<ClosureResult, Error> {
462 let (resp_tx, resp_rx) = oneshot::channel();
463 let work_item = WorkItem { frame, resp_tx };
464
465 self.work_tx
466 .send(work_item)
467 .await
468 .map_err(|_| Error::from("Engine worker thread has terminated"))?;
469
470 resp_rx
471 .await
472 .map_err(|_| Error::from("Engine worker thread has terminated"))?
473 }
474}
475
476fn interpret_closure_result(value: Value) -> Result<ClosureResult, Error> {
477 match value {
478 Value::Nothing { .. } => Ok(ClosureResult::Stop { output: None }),
479 Value::Record { ref val, .. } => {
480 for key in val.columns() {
481 if key != "out" && key != "next" {
482 return Err(format!(
483 "Unexpected key '{key}' in closure return record; only 'out' and 'next' are allowed"
484 )
485 .into());
486 }
487 }
488 let output = val.get("out").cloned();
489 match val.get("next").cloned() {
490 Some(next_state) => Ok(ClosureResult::Continue { output, next_state }),
491 None => Ok(ClosureResult::Stop { output }),
492 }
493 }
494 _ => Err(format!(
495 "Closure must return a record with 'out' and/or 'next' keys, or nothing; got {}",
496 value.get_type()
497 )
498 .into()),
499 }
500}
501
502fn is_value_an_append_frame_from_actor(value: &Value, actor_id: &Scru128Id) -> bool {
503 value
504 .as_record()
505 .ok()
506 .filter(|record| record.get("id").is_some() && record.get("topic").is_some())
507 .and_then(|record| record.get("meta"))
508 .and_then(|meta| meta.as_record().ok())
509 .and_then(|meta_record| meta_record.get("actor_id"))
510 .and_then(|id| id.as_str().ok())
511 .filter(|id| *id == actor_id.to_string())
512 .is_some()
513}
514
515fn extract_actor_config(
517 script_config: &NuScriptConfig,
518) -> Result<(ActorConfig, Option<serde_json::Value>), Error> {
519 let script_options: ActorScriptOptions = script_config.deserialize_options()?;
521
522 let start =
524 match script_options.start.as_deref() {
525 Some("first") => Start::First,
526 Some("new") => Start::New,
527 Some(id_str) => Start::After(Scru128Id::from_str(id_str).map_err(|_| -> Error {
528 format!("Invalid scru128 ID for start: {id_str}").into()
529 })?),
530 None => Start::default(), };
532
533 Ok((
535 ActorConfig {
536 start,
537 pulse: script_options.pulse,
538 return_options: script_options.return_options,
539 },
540 script_options.initial,
541 ))
542}