1use std::str::FromStr;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5use tracing::instrument;
6
7use serde::{Deserialize, Serialize};
8
9use tokio::io::AsyncReadExt;
10
11use nu_protocol::Value;
12
13use scru128::Scru128Id;
14
15use crate::error::Error;
16use crate::nu;
17use crate::nu::value_to_json;
18use crate::nu::{NuScriptConfig, ReturnOptions};
19use crate::store::{FollowOption, Frame, ReadOptions, Store};
20
21#[derive(Clone)]
22pub struct Actor {
23 pub id: Scru128Id,
24 pub topic: String,
25 config: ActorConfig,
26 engine_worker: Arc<EngineWorker>,
27 output: Arc<Mutex<Vec<Frame>>>,
28}
29
30#[derive(Clone, Debug)]
31struct ActorConfig {
32 start: Start,
33 pulse: Option<u64>,
34 return_options: Option<ReturnOptions>,
35}
36
37#[derive(Clone, Debug, Default, Serialize, Deserialize)]
38#[serde(rename_all = "snake_case")]
39enum Start {
40 First,
41 #[default]
42 New,
43 After(Scru128Id),
44}
45
46#[derive(Deserialize, Debug, Default)]
48#[serde(default)] struct ActorScriptOptions {
50 start: Option<String>,
52 pulse: Option<u64>,
54 return_options: Option<ReturnOptions>,
56 initial: Option<serde_json::Value>,
58}
59
60pub(super) enum ClosureResult {
61 Continue {
62 output: Option<Value>,
63 next_state: Value,
64 },
65 Stop {
66 output: Option<Value>,
67 },
68}
69
70impl Actor {
71 pub async fn new(
72 id: Scru128Id,
73 topic: String,
74 mut engine: nu::Engine,
75 expression: String,
76 store: Store,
77 ) -> Result<Self, Error> {
78 let output = Arc::new(Mutex::new(Vec::new()));
79 nu::add_read_commands(&mut engine, &store, nu::ReadMode::Plain)?;
80 nu::add_write_commands(
81 &mut engine,
82 &store,
83 nu::AppendMode::Buffered(output.clone()),
84 )?;
85
86 let nu_script_config = nu::parse_config(&mut engine, &expression)?;
88
89 let (actor_config, initial_json) = extract_actor_config(&nu_script_config)?;
91
92 let block = engine
94 .state
95 .get_block(nu_script_config.run_closure.block_id);
96 let num_required = block.signature.required_positional.len();
97 let num_optional = block.signature.optional_positional.len();
98
99 let total_positional = num_required + num_optional;
100 if total_positional != 2 {
101 return Err(format!(
102 "Closure must accept exactly 2 params (frame, state), got {total_positional}"
103 )
104 .into());
105 }
106
107 let span = nu_protocol::Span::unknown();
109 let initial_state = if let Some(json) = initial_json {
110 crate::nu::util::json_to_value(&json, span)
111 } else if num_optional > 0 {
112 let state_param = &block.signature.optional_positional[0];
113 state_param
114 .default_value
115 .clone()
116 .unwrap_or_else(|| Value::nothing(span))
117 } else {
118 Value::nothing(span)
119 };
120
121 let engine_worker = Arc::new(EngineWorker::new(
122 engine,
123 nu_script_config.run_closure,
124 initial_state,
125 ));
126
127 Ok(Self {
128 id,
129 topic,
130 config: actor_config,
131 engine_worker,
132 output,
133 })
134 }
135
136 async fn eval_in_thread(&self, frame: &Frame) -> Result<ClosureResult, Error> {
137 self.engine_worker.eval(frame.clone()).await
138 }
139
140 #[instrument(
141 level = "info",
142 skip(self, frame, store),
143 fields(
144 message = %format!(
145 "actor={actor_id}:{topic} frame={frame_id}:{frame_topic}",
146 actor_id = self.id, topic = self.topic, frame_id = frame.id, frame_topic = frame.topic)
147 )
148 )]
149 async fn process_frame(&mut self, frame: &Frame, store: &Store) -> Result<bool, Error> {
150 let result = self.eval_in_thread(frame).await?;
151
152 let (output, should_continue) = match result {
153 ClosureResult::Continue { output, .. } => (output, true),
154 ClosureResult::Stop { output } => (output, false),
155 };
156
157 let additional_frame = match output {
159 Some(ref value)
160 if !is_value_an_append_frame_from_actor(value, &self.id)
161 && !matches!(value, Value::Nothing { .. }) =>
162 {
163 let return_options = self.config.return_options.as_ref();
164 let suffix = return_options
165 .and_then(|ro| ro.suffix.as_deref())
166 .unwrap_or(".out");
167 let use_cas = return_options
168 .and_then(|ro| ro.target.as_deref())
169 .is_some_and(|t| t == "cas");
170
171 let topic = format!("{topic}{suffix}", topic = self.topic, suffix = suffix);
172 let ttl = return_options.and_then(|ro| ro.ttl.clone());
173
174 if use_cas {
175 let hash = match value {
176 Value::Binary { val, .. } => store.cas_insert(val).await?,
177 _ => store.cas_insert(&value_to_json(value).to_string()).await?,
178 };
179 Some(
180 Frame::builder(topic)
181 .maybe_ttl(ttl)
182 .maybe_hash(Some(hash))
183 .build(),
184 )
185 } else {
186 match value {
188 Value::Record { .. } => {
189 let json = value_to_json(value);
190 Some(Frame::builder(topic).maybe_ttl(ttl).meta(json).build())
191 }
192 _ => {
193 return Err(format!(
194 "Actor output must be a record when target is not \"cas\"; got {}. \
195 Set return_options.target to \"cas\" for non-record output.",
196 value.get_type()
197 )
198 .into());
199 }
200 }
201 }
202 }
203 _ => None,
204 };
205
206 let output_to_process: Vec<_> = {
208 let mut output = self.output.lock().unwrap();
209 output
210 .drain(..)
211 .chain(additional_frame.into_iter())
212 .collect()
213 };
214
215 for mut output_frame in output_to_process {
216 let meta_obj = output_frame
217 .meta
218 .get_or_insert_with(|| serde_json::Value::Object(Default::default()))
219 .as_object_mut()
220 .expect("meta should be an object");
221
222 meta_obj.insert(
223 "actor_id".to_string(),
224 serde_json::Value::String(self.id.to_string()),
225 );
226 meta_obj.insert(
227 "frame_id".to_string(),
228 serde_json::Value::String(frame.id.to_string()),
229 );
230
231 let _ = store.append(output_frame);
232 }
233
234 Ok(should_continue)
235 }
236
237 async fn serve(&mut self, store: &Store, options: ReadOptions) {
238 let mut recver = store.read(options).await;
239 let create_topic = format!("xs.actor.{}.create", self.topic);
240 let term_topic = format!("xs.actor.{}.term", self.topic);
241
242 while let Some(frame) = recver.recv().await {
243 if (frame.topic == create_topic || frame.topic == term_topic) && frame.id <= self.id {
245 continue;
246 }
247
248 if frame.topic == create_topic {
251 let _ = store.append(
252 Frame::builder(format!("xs.actor.{}.replaced", &self.topic))
253 .meta(serde_json::json!({
254 "actor_id": self.id.to_string(),
255 "frame_id": frame.id.to_string(),
256 }))
257 .build(),
258 );
259 break;
260 }
261
262 if frame.topic == term_topic {
264 let _ = store.append(
265 Frame::builder(format!("xs.actor.{}.fin.term", &self.topic))
266 .meta(serde_json::json!({
267 "actor_id": self.id.to_string(),
268 "frame_id": frame.id.to_string(),
269 }))
270 .build(),
271 );
272 break;
273 }
274
275 if frame
277 .meta
278 .as_ref()
279 .and_then(|meta| meta.get("actor_id"))
280 .and_then(|actor_id| actor_id.as_str())
281 .filter(|actor_id| *actor_id == self.id.to_string())
282 .is_some()
283 {
284 continue;
285 }
286
287 match self.process_frame(&frame, store).await {
288 Ok(true) => {}
289 Ok(false) => {
290 let _ = store.append(
292 Frame::builder(format!("xs.actor.{}.fin.ok", &self.topic))
293 .meta(serde_json::json!({
294 "actor_id": self.id.to_string(),
295 "frame_id": frame.id.to_string(),
296 }))
297 .build(),
298 );
299 break;
300 }
301 Err(err) => {
302 let _ = store.append(
304 Frame::builder(format!("xs.actor.{}.fin.error", &self.topic))
305 .meta(serde_json::json!({
306 "actor_id": self.id.to_string(),
307 "frame_id": frame.id.to_string(),
308 "error": err.to_string(),
309 }))
310 .build(),
311 );
312 break;
313 }
314 }
315 }
316 }
317
318 pub async fn spawn(&self, store: Store) -> Result<(), Error> {
319 let options = self.configure_read_options().await;
320
321 {
322 let store = store.clone();
323 let options = options.clone();
324 let mut actor = self.clone();
325
326 tokio::spawn(async move {
327 actor.serve(&store, options).await;
328 });
329 }
330
331 let _ = store.append(
332 Frame::builder(format!("xs.actor.{}.active", &self.topic))
333 .meta(serde_json::json!({
334 "actor_id": self.id.to_string(),
335 "start": self.config.start,
336 }))
337 .build(),
338 );
339
340 Ok(())
341 }
342
343 pub async fn from_frame(frame: &Frame, store: &Store) -> Result<Self, Error> {
344 let topic = frame
345 .topic
346 .strip_prefix("xs.actor.")
347 .and_then(|rest| rest.strip_suffix(".create"))
348 .ok_or("Frame topic must be xs.actor.<name>.create")?;
349
350 let hash = frame.hash.as_ref().ok_or("Missing hash field")?;
352 let mut reader = store
353 .cas_reader(hash.clone())
354 .await
355 .map_err(|e| format!("Failed to get cas reader: {e}"))?;
356
357 let mut expression = String::new();
358 reader
359 .read_to_string(&mut expression)
360 .await
361 .map_err(|e| format!("Failed to read expression: {e}"))?;
362
363 let engine = crate::processor::build_engine(store, &frame.id)?;
365
366 let actor = Actor::new(
367 frame.id,
368 topic.to_string(),
369 engine,
370 expression,
371 store.clone(),
372 )
373 .await?;
374
375 Ok(actor)
376 }
377
378 async fn configure_read_options(&self) -> ReadOptions {
379 let (after, is_new) = match &self.config.start {
381 Start::First => (None, false),
382 Start::New => (None, true),
383 Start::After(id) => (Some(*id), false),
384 };
385
386 let follow_option = self
388 .config
389 .pulse
390 .map(|pulse| FollowOption::WithHeartbeat(Duration::from_millis(pulse)))
391 .unwrap_or(FollowOption::On);
392
393 ReadOptions::builder()
394 .follow(follow_option)
395 .new(is_new)
396 .maybe_after(after)
397 .build()
398 }
399}
400
401use tokio::sync::{mpsc, oneshot};
402
403pub struct EngineWorker {
404 work_tx: mpsc::Sender<WorkItem>,
405}
406
407struct WorkItem {
408 frame: Frame,
409 resp_tx: oneshot::Sender<Result<ClosureResult, Error>>,
410}
411
412impl EngineWorker {
413 pub fn new(
414 engine: nu::Engine,
415 closure: nu_protocol::engine::Closure,
416 initial_state: Value,
417 ) -> Self {
418 let (work_tx, mut work_rx) = mpsc::channel(32);
419
420 std::thread::spawn(move || {
421 let mut engine = engine;
422 let mut state = initial_state;
423
424 while let Some(WorkItem { frame, resp_tx }) = work_rx.blocking_recv() {
425 let frame_val =
426 crate::nu::frame_to_value(&frame, nu_protocol::Span::unknown(), false);
427
428 let pipeline = engine.run_closure_in_job(
429 &closure,
430 vec![frame_val, state.clone()],
431 None,
432 format!("actor {topic}", topic = frame.topic),
433 );
434
435 let result = pipeline
436 .map_err(|e| {
437 let working_set = nu_protocol::engine::StateWorkingSet::new(&engine.state);
438 Error::from(nu_protocol::format_cli_error(None, &working_set, &*e, None))
439 })
440 .and_then(|pd| {
441 pd.into_value(nu_protocol::Span::unknown())
442 .map_err(Error::from)
443 })
444 .and_then(interpret_closure_result);
445
446 if let Ok(ClosureResult::Continue { ref next_state, .. }) = result {
447 state = next_state.clone();
448 }
449
450 let _ = resp_tx.send(result);
451 }
452 });
453
454 Self { work_tx }
455 }
456
457 pub async fn eval(&self, frame: Frame) -> Result<ClosureResult, Error> {
458 let (resp_tx, resp_rx) = oneshot::channel();
459 let work_item = WorkItem { frame, resp_tx };
460
461 self.work_tx
462 .send(work_item)
463 .await
464 .map_err(|_| Error::from("Engine worker thread has terminated"))?;
465
466 resp_rx
467 .await
468 .map_err(|_| Error::from("Engine worker thread has terminated"))?
469 }
470}
471
472fn interpret_closure_result(value: Value) -> Result<ClosureResult, Error> {
473 match value {
474 Value::Nothing { .. } => Ok(ClosureResult::Stop { output: None }),
475 Value::Record { ref val, .. } => {
476 for key in val.columns() {
477 if key != "out" && key != "next" {
478 return Err(format!(
479 "Unexpected key '{key}' in closure return record; only 'out' and 'next' are allowed"
480 )
481 .into());
482 }
483 }
484 let output = val.get("out").cloned();
485 match val.get("next").cloned() {
486 Some(next_state) => Ok(ClosureResult::Continue { output, next_state }),
487 None => Ok(ClosureResult::Stop { output }),
488 }
489 }
490 _ => Err(format!(
491 "Closure must return a record with 'out' and/or 'next' keys, or nothing; got {}",
492 value.get_type()
493 )
494 .into()),
495 }
496}
497
498fn is_value_an_append_frame_from_actor(value: &Value, actor_id: &Scru128Id) -> bool {
499 value
500 .as_record()
501 .ok()
502 .filter(|record| record.get("id").is_some() && record.get("topic").is_some())
503 .and_then(|record| record.get("meta"))
504 .and_then(|meta| meta.as_record().ok())
505 .and_then(|meta_record| meta_record.get("actor_id"))
506 .and_then(|id| id.as_str().ok())
507 .filter(|id| *id == actor_id.to_string())
508 .is_some()
509}
510
511fn extract_actor_config(
513 script_config: &NuScriptConfig,
514) -> Result<(ActorConfig, Option<serde_json::Value>), Error> {
515 let script_options: ActorScriptOptions = script_config.deserialize_options()?;
517
518 let start =
520 match script_options.start.as_deref() {
521 Some("first") => Start::First,
522 Some("new") => Start::New,
523 Some(id_str) => Start::After(Scru128Id::from_str(id_str).map_err(|_| -> Error {
524 format!("Invalid scru128 ID for start: {id_str}").into()
525 })?),
526 None => Start::default(), };
528
529 Ok((
531 ActorConfig {
532 start,
533 pulse: script_options.pulse,
534 return_options: script_options.return_options,
535 },
536 script_options.initial,
537 ))
538}