1use std::str::FromStr;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5use serde::{Deserialize, Serialize};
6
7use tokio::io::AsyncReadExt;
8
9use nu_protocol::Value;
10
11use scru128::Scru128Id;
12
13use crate::error::Error;
14use crate::nu;
15use crate::nu::value_to_json;
16use crate::nu::{NuScriptConfig, ReturnOptions};
17use crate::store::{FollowOption, Frame, ReadOptions, Store};
18
19#[derive(Clone)]
20pub struct Actor {
21 pub id: Scru128Id,
22 pub topic: String,
23 config: ActorConfig,
24 engine: nu::Engine,
25 closure: nu_protocol::engine::Closure,
26 initial_state: Value,
27 output: Arc<Mutex<Vec<Frame>>>,
28}
29
30#[derive(Clone, Debug)]
31struct ActorConfig {
32 start: Start,
33 pulse: Option<u64>,
34 return_options: Option<ReturnOptions>,
35 topics: Option<Vec<String>>,
41}
42
43#[derive(Clone, Debug, Default, Serialize, Deserialize)]
44#[serde(rename_all = "snake_case")]
45enum Start {
46 First,
47 #[default]
48 New,
49 After(Scru128Id),
50}
51
52#[derive(Deserialize, Debug, Default)]
54#[serde(default)] struct ActorScriptOptions {
56 start: Option<String>,
58 pulse: Option<u64>,
60 return_options: Option<ReturnOptions>,
62 initial: Option<serde_json::Value>,
64 topics: Option<TopicsSpec>,
67}
68
69#[derive(Deserialize, Debug)]
72#[serde(untagged)]
73enum TopicsSpec {
74 One(String),
75 Many(Vec<String>),
76}
77
78impl TopicsSpec {
79 fn into_patterns(self) -> Vec<String> {
82 let parts: Vec<String> = match self {
83 TopicsSpec::One(s) => vec![s],
84 TopicsSpec::Many(v) => v,
85 };
86 parts
87 .iter()
88 .flat_map(|s| s.split(','))
89 .map(|p| p.trim().to_string())
90 .filter(|p| !p.is_empty())
91 .collect()
92 }
93}
94
95pub(super) enum ClosureResult {
96 Continue {
97 output: Option<Value>,
98 next_state: Value,
99 },
100 Stop {
101 output: Option<Value>,
102 },
103}
104
105impl Actor {
106 pub async fn new(
107 id: Scru128Id,
108 topic: String,
109 mut engine: nu::Engine,
110 expression: String,
111 store: Store,
112 ) -> Result<Self, Error> {
113 let output = Arc::new(Mutex::new(Vec::new()));
114 nu::add_write_commands(
117 &mut engine,
118 &store,
119 nu::AppendMode::Buffered(output.clone()),
120 )?;
121
122 let nu_script_config = nu::parse_config(&mut engine, &expression)?;
124
125 let (actor_config, initial_json) = extract_actor_config(&nu_script_config)?;
127
128 let block = engine
130 .state
131 .get_block(nu_script_config.run_closure.block_id);
132 let num_required = block.signature.required_positional.len();
133 let num_optional = block.signature.optional_positional.len();
134
135 let total_positional = num_required + num_optional;
136 if total_positional != 2 {
137 return Err(format!(
138 "Closure must accept exactly 2 params (frame, state), got {total_positional}"
139 )
140 .into());
141 }
142
143 let span = nu_protocol::Span::unknown();
145 let initial_state = if let Some(json) = initial_json {
146 crate::nu::util::json_to_value(&json, span)
147 } else if num_optional > 0 {
148 let state_param = &block.signature.optional_positional[0];
149 state_param
150 .default_value
151 .clone()
152 .unwrap_or_else(|| Value::nothing(span))
153 } else {
154 Value::nothing(span)
155 };
156
157 Ok(Self {
158 id,
159 topic,
160 config: actor_config,
161 engine,
162 closure: nu_script_config.run_closure,
163 initial_state,
164 output,
165 })
166 }
167
168 fn eval_frame(&mut self, frame: &Frame, state: &Value) -> Result<ClosureResult, Error> {
174 let frame_val = crate::nu::frame_to_value(frame, nu_protocol::Span::unknown(), false);
175 self.engine
176 .eval_closure_no_job(&self.closure, vec![frame_val, state.clone()], None)
177 .map_err(|e| {
178 let working_set = nu_protocol::engine::StateWorkingSet::new(&self.engine.state);
179 Error::from(nu_protocol::format_cli_error(None, &working_set, &*e, None))
180 })
181 .and_then(|pd| {
182 pd.into_value(nu_protocol::Span::unknown())
183 .map_err(Error::from)
184 })
185 .and_then(interpret_closure_result)
186 }
187
188 fn process_frame(
189 &mut self,
190 frame: &Frame,
191 store: &Store,
192 state: &mut Value,
193 ) -> Result<bool, Error> {
194 let result = self.eval_frame(frame, state)?;
195
196 let (output, should_continue) = match result {
197 ClosureResult::Continue {
198 output,
199 ref next_state,
200 } => {
201 *state = next_state.clone();
202 (output, true)
203 }
204 ClosureResult::Stop { output } => (output, false),
205 };
206
207 let additional_frame = match output {
209 Some(ref value)
210 if !is_value_an_append_frame_from_actor(value, &self.id)
211 && !matches!(value, Value::Nothing { .. }) =>
212 {
213 let return_options = self.config.return_options.as_ref();
214 let suffix = return_options
215 .and_then(|ro| ro.suffix.as_deref())
216 .unwrap_or(".out");
217 let use_cas = return_options
218 .and_then(|ro| ro.target.as_deref())
219 .is_some_and(|t| t == "cas");
220
221 let topic = format!("{topic}{suffix}", topic = self.topic, suffix = suffix);
222 let ttl = return_options.and_then(|ro| ro.ttl.clone());
223
224 if use_cas {
225 let hash = match value {
226 Value::Binary { val, .. } => store.cas_insert_sync(val)?,
227 _ => store.cas_insert_sync(value_to_json(value).to_string())?,
228 };
229 Some(
230 Frame::builder(topic)
231 .maybe_ttl(ttl)
232 .maybe_hash(Some(hash))
233 .build(),
234 )
235 } else {
236 match value {
238 Value::Record { .. } => {
239 let json = value_to_json(value);
240 Some(Frame::builder(topic).maybe_ttl(ttl).meta(json).build())
241 }
242 _ => {
243 return Err(format!(
244 "Actor output must be a record when target is not \"cas\"; got {}. \
245 Set return_options.target to \"cas\" for non-record output.",
246 value.get_type()
247 )
248 .into());
249 }
250 }
251 }
252 }
253 _ => None,
254 };
255
256 let output_to_process: Vec<_> = {
258 let mut output = self.output.lock().unwrap();
259 output.drain(..).chain(additional_frame).collect()
260 };
261
262 for mut output_frame in output_to_process {
263 let meta_obj = output_frame
264 .meta
265 .get_or_insert_with(|| serde_json::Value::Object(Default::default()))
266 .as_object_mut()
267 .expect("meta should be an object");
268
269 meta_obj.insert(
270 "actor_id".to_string(),
271 serde_json::Value::String(self.id.to_string()),
272 );
273 meta_obj.insert(
274 "frame_id".to_string(),
275 serde_json::Value::String(frame.id.to_string()),
276 );
277
278 let _ = store.append(output_frame);
279 }
280
281 Ok(should_continue)
282 }
283
284 fn run_blocking(mut self, mut recver: mpsc::Receiver<Frame>, store: Store) {
290 self.engine.attach_background_job("actor");
292 let mut state = self.initial_state.clone();
293
294 let create_topic = format!("xs.actor.{}.create", self.topic);
295 let term_topic = format!("xs.actor.{}.term", self.topic);
296 let store = &store;
297
298 while let Some(frame) = recver.blocking_recv() {
299 if (frame.topic == create_topic || frame.topic == term_topic) && frame.id <= self.id {
301 continue;
302 }
303
304 if frame.topic == create_topic {
307 let _ = store.append(
308 Frame::builder(format!("xs.actor.{}.replaced", &self.topic))
309 .meta(serde_json::json!({
310 "actor_id": self.id.to_string(),
311 "frame_id": frame.id.to_string(),
312 }))
313 .build(),
314 );
315 break;
316 }
317
318 if frame.topic == term_topic {
320 let _ = store.append(
321 Frame::builder(format!("xs.actor.{}.fin.term", &self.topic))
322 .meta(serde_json::json!({
323 "actor_id": self.id.to_string(),
324 "frame_id": frame.id.to_string(),
325 }))
326 .build(),
327 );
328 break;
329 }
330
331 if frame
333 .meta
334 .as_ref()
335 .and_then(|meta| meta.get("actor_id"))
336 .and_then(|actor_id| actor_id.as_str())
337 .filter(|actor_id| *actor_id == self.id.to_string())
338 .is_some()
339 {
340 continue;
341 }
342
343 match self.process_frame(&frame, store, &mut state) {
344 Ok(true) => {}
345 Ok(false) => {
346 let _ = store.append(
348 Frame::builder(format!("xs.actor.{}.fin.ok", &self.topic))
349 .meta(serde_json::json!({
350 "actor_id": self.id.to_string(),
351 "frame_id": frame.id.to_string(),
352 }))
353 .build(),
354 );
355 break;
356 }
357 Err(err) => {
358 let _ = store.append(
360 Frame::builder(format!("xs.actor.{}.fin.error", &self.topic))
361 .meta(serde_json::json!({
362 "actor_id": self.id.to_string(),
363 "frame_id": frame.id.to_string(),
364 "error": err.to_string(),
365 }))
366 .build(),
367 );
368 break;
369 }
370 }
371 }
372 }
373
374 pub async fn spawn(self, store: Store) -> Result<(), Error> {
375 let options = self.configure_read_options().await;
376 let recver = store.read(options).await;
381
382 let _ = store.append(
383 Frame::builder(format!("xs.actor.{}.active", &self.topic))
384 .meta(serde_json::json!({
385 "actor_id": self.id.to_string(),
386 "start": self.config.start,
387 }))
388 .build(),
389 );
390
391 let store_for_loop = store.clone();
392 std::thread::Builder::new()
393 .name(format!("actor-{}", self.topic))
394 .spawn(move || {
395 self.run_blocking(recver, store_for_loop);
396 })
397 .map_err(|e| Error::from(format!("Failed to spawn actor thread: {e}")))?;
398
399 Ok(())
400 }
401
402 pub async fn from_frame(frame: &Frame, store: &Store) -> Result<Self, Error> {
403 let topic = frame
404 .topic
405 .strip_prefix("xs.actor.")
406 .and_then(|rest| rest.strip_suffix(".create"))
407 .ok_or("Frame topic must be xs.actor.<name>.create")?;
408
409 let hash = frame.hash.as_ref().ok_or("Missing hash field")?;
411 let mut reader = store
412 .cas_reader(hash.clone())
413 .await
414 .map_err(|e| format!("Failed to get cas reader: {e}"))?;
415
416 let mut expression = String::new();
417 reader
418 .read_to_string(&mut expression)
419 .await
420 .map_err(|e| format!("Failed to read expression: {e}"))?;
421
422 let mut engine = nu::prepared_base(store, nu::ReadMode::Plain, false)?;
425 let modules = store.nu_modules_at(&frame.id);
426 nu::load_modules(&mut engine.state, store, &modules)?;
427
428 let actor = Actor::new(
429 frame.id,
430 topic.to_string(),
431 engine,
432 expression,
433 store.clone(),
434 )
435 .await?;
436
437 Ok(actor)
438 }
439
440 async fn configure_read_options(&self) -> ReadOptions {
441 let (after, is_new) = match &self.config.start {
443 Start::First => (None, false),
444 Start::New => (None, true),
445 Start::After(id) => (Some(*id), false),
446 };
447
448 let follow_option = self
450 .config
451 .pulse
452 .map(|pulse| FollowOption::WithHeartbeat(Duration::from_millis(pulse)))
453 .unwrap_or(FollowOption::On);
454
455 let topic = self.config.topics.as_ref().map(|patterns| {
462 let mut patterns = patterns.clone();
463 patterns.push(format!("xs.actor.{}.create", self.topic));
464 patterns.push(format!("xs.actor.{}.term", self.topic));
465 patterns.join(",")
466 });
467
468 ReadOptions::builder()
469 .follow(follow_option)
470 .new(is_new)
471 .maybe_after(after)
472 .maybe_topic(topic)
473 .build()
474 }
475}
476
477use tokio::sync::mpsc;
478
479fn interpret_closure_result(value: Value) -> Result<ClosureResult, Error> {
480 match value {
481 Value::Nothing { .. } => Ok(ClosureResult::Stop { output: None }),
482 Value::Record { ref val, .. } => {
483 for key in val.columns() {
484 if key != "out" && key != "next" {
485 return Err(format!(
486 "Unexpected key '{key}' in closure return record; only 'out' and 'next' are allowed"
487 )
488 .into());
489 }
490 }
491 let output = val.get("out").cloned();
492 match val.get("next").cloned() {
493 Some(next_state) => Ok(ClosureResult::Continue { output, next_state }),
494 None => Ok(ClosureResult::Stop { output }),
495 }
496 }
497 _ => Err(format!(
498 "Closure must return a record with 'out' and/or 'next' keys, or nothing; got {}",
499 value.get_type()
500 )
501 .into()),
502 }
503}
504
505fn is_value_an_append_frame_from_actor(value: &Value, actor_id: &Scru128Id) -> bool {
506 value
507 .as_record()
508 .ok()
509 .filter(|record| record.get("id").is_some() && record.get("topic").is_some())
510 .and_then(|record| record.get("meta"))
511 .and_then(|meta| meta.as_record().ok())
512 .and_then(|meta_record| meta_record.get("actor_id"))
513 .and_then(|id| id.as_str().ok())
514 .filter(|id| *id == actor_id.to_string())
515 .is_some()
516}
517
518fn extract_actor_config(
520 script_config: &NuScriptConfig,
521) -> Result<(ActorConfig, Option<serde_json::Value>), Error> {
522 let script_options: ActorScriptOptions = script_config.deserialize_options()?;
524
525 let start =
527 match script_options.start.as_deref() {
528 Some("first") => Start::First,
529 Some("new") => Start::New,
530 Some(id_str) => Start::After(Scru128Id::from_str(id_str).map_err(|_| -> Error {
531 format!("Invalid scru128 ID for start: {id_str}").into()
532 })?),
533 None => Start::default(), };
535
536 Ok((
538 ActorConfig {
539 start,
540 pulse: script_options.pulse,
541 return_options: script_options.return_options,
542 topics: script_options.topics.map(TopicsSpec::into_patterns),
543 },
544 script_options.initial,
545 ))
546}