Skip to main content

xs/processor/actor/
actor.rs

1use std::str::FromStr;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5use tracing::instrument;
6
7use serde::{Deserialize, Serialize};
8
9use tokio::io::AsyncReadExt;
10
11use nu_protocol::Value;
12
13use scru128::Scru128Id;
14
15use crate::error::Error;
16use crate::nu;
17use crate::nu::value_to_json;
18use crate::nu::{NuScriptConfig, ReturnOptions};
19use crate::store::{FollowOption, Frame, ReadOptions, Store};
20
21#[derive(Clone)]
22pub struct Actor {
23    pub id: Scru128Id,
24    pub topic: String,
25    config: ActorConfig,
26    engine_worker: Arc<EngineWorker>,
27    output: Arc<Mutex<Vec<Frame>>>,
28}
29
30#[derive(Clone, Debug)]
31struct ActorConfig {
32    start: Start,
33    pulse: Option<u64>,
34    return_options: Option<ReturnOptions>,
35}
36
37#[derive(Clone, Debug, Default, Serialize, Deserialize)]
38#[serde(rename_all = "snake_case")]
39enum Start {
40    First,
41    #[default]
42    New,
43    After(Scru128Id),
44}
45
46/// Options that can be deserialized directly from a script.
47#[derive(Deserialize, Debug, Default)]
48#[serde(default)] // Use default values when fields are missing
49struct ActorScriptOptions {
50    /// Actor can specify where to start: "first", "new", or a specific ID
51    start: Option<String>,
52    /// Optional heartbeat interval in milliseconds
53    pulse: Option<u64>,
54    /// Optional customizations for return frames
55    return_options: Option<ReturnOptions>,
56    /// Initial state for the actor closure's second parameter
57    initial: Option<serde_json::Value>,
58}
59
60pub(super) enum ClosureResult {
61    Continue {
62        output: Option<Value>,
63        next_state: Value,
64    },
65    Stop {
66        output: Option<Value>,
67    },
68}
69
70impl Actor {
71    pub async fn new(
72        id: Scru128Id,
73        topic: String,
74        mut engine: nu::Engine,
75        expression: String,
76        store: Store,
77    ) -> Result<Self, Error> {
78        let output = Arc::new(Mutex::new(Vec::new()));
79        // Reads come from the prepared base; only the per-instance buffered
80        // `.append` (for atomic batch commit) is added here.
81        nu::add_write_commands(
82            &mut engine,
83            &store,
84            nu::AppendMode::Buffered(output.clone()),
85        )?;
86
87        // Parse configuration using the new generic API
88        let nu_script_config = nu::parse_config(&mut engine, &expression)?;
89
90        // Deserialize actor-specific options from the full_config_value
91        let (actor_config, initial_json) = extract_actor_config(&nu_script_config)?;
92
93        // Validate closure signature and resolve initial state
94        let block = engine
95            .state
96            .get_block(nu_script_config.run_closure.block_id);
97        let num_required = block.signature.required_positional.len();
98        let num_optional = block.signature.optional_positional.len();
99
100        let total_positional = num_required + num_optional;
101        if total_positional != 2 {
102            return Err(format!(
103                "Closure must accept exactly 2 params (frame, state), got {total_positional}"
104            )
105            .into());
106        }
107
108        // Resolve initial state: config `initial` > param default > null
109        let span = nu_protocol::Span::unknown();
110        let initial_state = if let Some(json) = initial_json {
111            crate::nu::util::json_to_value(&json, span)
112        } else if num_optional > 0 {
113            let state_param = &block.signature.optional_positional[0];
114            state_param
115                .default_value
116                .clone()
117                .unwrap_or_else(|| Value::nothing(span))
118        } else {
119            Value::nothing(span)
120        };
121
122        let engine_worker = Arc::new(EngineWorker::new(
123            engine,
124            nu_script_config.run_closure,
125            initial_state,
126        ));
127
128        Ok(Self {
129            id,
130            topic,
131            config: actor_config,
132            engine_worker,
133            output,
134        })
135    }
136
137    async fn eval_in_thread(&self, frame: &Frame) -> Result<ClosureResult, Error> {
138        self.engine_worker.eval(frame.clone()).await
139    }
140
141    #[instrument(
142        level = "info",
143        skip(self, frame, store),
144        fields(
145            message = %format!(
146                "actor={actor_id}:{topic} frame={frame_id}:{frame_topic}",
147                actor_id = self.id, topic = self.topic, frame_id = frame.id, frame_topic = frame.topic)
148        )
149    )]
150    async fn process_frame(&mut self, frame: &Frame, store: &Store) -> Result<bool, Error> {
151        let result = self.eval_in_thread(frame).await?;
152
153        let (output, should_continue) = match result {
154            ClosureResult::Continue { output, .. } => (output, true),
155            ClosureResult::Stop { output } => (output, false),
156        };
157
158        // Check if the evaluated value is an append frame
159        let additional_frame = match output {
160            Some(ref value)
161                if !is_value_an_append_frame_from_actor(value, &self.id)
162                    && !matches!(value, Value::Nothing { .. }) =>
163            {
164                let return_options = self.config.return_options.as_ref();
165                let suffix = return_options
166                    .and_then(|ro| ro.suffix.as_deref())
167                    .unwrap_or(".out");
168                let use_cas = return_options
169                    .and_then(|ro| ro.target.as_deref())
170                    .is_some_and(|t| t == "cas");
171
172                let topic = format!("{topic}{suffix}", topic = self.topic, suffix = suffix);
173                let ttl = return_options.and_then(|ro| ro.ttl.clone());
174
175                if use_cas {
176                    let hash = match value {
177                        Value::Binary { val, .. } => store.cas_insert(val).await?,
178                        _ => store.cas_insert(&value_to_json(value).to_string()).await?,
179                    };
180                    Some(
181                        Frame::builder(topic)
182                            .maybe_ttl(ttl)
183                            .maybe_hash(Some(hash))
184                            .build(),
185                    )
186                } else {
187                    // Default: records go to meta, non-records are an error
188                    match value {
189                        Value::Record { .. } => {
190                            let json = value_to_json(value);
191                            Some(Frame::builder(topic).maybe_ttl(ttl).meta(json).build())
192                        }
193                        _ => {
194                            return Err(format!(
195                                "Actor output must be a record when target is not \"cas\"; got {}. \
196                                 Set return_options.target to \"cas\" for non-record output.",
197                                value.get_type()
198                            )
199                            .into());
200                        }
201                    }
202                }
203            }
204            _ => None,
205        };
206
207        // Process buffered appends and the additional frame
208        let output_to_process: Vec<_> = {
209            let mut output = self.output.lock().unwrap();
210            output
211                .drain(..)
212                .chain(additional_frame.into_iter())
213                .collect()
214        };
215
216        for mut output_frame in output_to_process {
217            let meta_obj = output_frame
218                .meta
219                .get_or_insert_with(|| serde_json::Value::Object(Default::default()))
220                .as_object_mut()
221                .expect("meta should be an object");
222
223            meta_obj.insert(
224                "actor_id".to_string(),
225                serde_json::Value::String(self.id.to_string()),
226            );
227            meta_obj.insert(
228                "frame_id".to_string(),
229                serde_json::Value::String(frame.id.to_string()),
230            );
231
232            let _ = store.append(output_frame);
233        }
234
235        Ok(should_continue)
236    }
237
238    async fn serve(&mut self, store: &Store, options: ReadOptions) {
239        let mut recver = store.read(options).await;
240        let create_topic = format!("xs.actor.{}.create", self.topic);
241        let term_topic = format!("xs.actor.{}.term", self.topic);
242
243        while let Some(frame) = recver.recv().await {
244            // Skip lifecycle activity that occurred before this actor was registered
245            if (frame.topic == create_topic || frame.topic == term_topic) && frame.id <= self.id {
246                continue;
247            }
248
249            // A newer .create wins: this actor steps aside. Emit .replaced
250            // (the successor's .active will be next).
251            if frame.topic == create_topic {
252                let _ = store.append(
253                    Frame::builder(format!("xs.actor.{}.replaced", &self.topic))
254                        .meta(serde_json::json!({
255                            "actor_id": self.id.to_string(),
256                            "frame_id": frame.id.to_string(),
257                        }))
258                        .build(),
259                );
260                break;
261            }
262
263            // User-requested stop.
264            if frame.topic == term_topic {
265                let _ = store.append(
266                    Frame::builder(format!("xs.actor.{}.fin.term", &self.topic))
267                        .meta(serde_json::json!({
268                            "actor_id": self.id.to_string(),
269                            "frame_id": frame.id.to_string(),
270                        }))
271                        .build(),
272                );
273                break;
274            }
275
276            // Skip frames that were generated by this actor
277            if frame
278                .meta
279                .as_ref()
280                .and_then(|meta| meta.get("actor_id"))
281                .and_then(|actor_id| actor_id.as_str())
282                .filter(|actor_id| *actor_id == self.id.to_string())
283                .is_some()
284            {
285                continue;
286            }
287
288            match self.process_frame(&frame, store).await {
289                Ok(true) => {}
290                Ok(false) => {
291                    // Actor self-terminated (natural completion).
292                    let _ = store.append(
293                        Frame::builder(format!("xs.actor.{}.fin.ok", &self.topic))
294                            .meta(serde_json::json!({
295                                "actor_id": self.id.to_string(),
296                                "frame_id": frame.id.to_string(),
297                            }))
298                            .build(),
299                    );
300                    break;
301                }
302                Err(err) => {
303                    // Runtime crash.
304                    let _ = store.append(
305                        Frame::builder(format!("xs.actor.{}.fin.error", &self.topic))
306                            .meta(serde_json::json!({
307                                "actor_id": self.id.to_string(),
308                                "frame_id": frame.id.to_string(),
309                                "error": err.to_string(),
310                            }))
311                            .build(),
312                    );
313                    break;
314                }
315            }
316        }
317    }
318
319    pub async fn spawn(&self, store: Store) -> Result<(), Error> {
320        let options = self.configure_read_options().await;
321
322        {
323            let store = store.clone();
324            let options = options.clone();
325            let mut actor = self.clone();
326
327            tokio::spawn(async move {
328                actor.serve(&store, options).await;
329            });
330        }
331
332        let _ = store.append(
333            Frame::builder(format!("xs.actor.{}.active", &self.topic))
334                .meta(serde_json::json!({
335                    "actor_id": self.id.to_string(),
336                    "start": self.config.start,
337                }))
338                .build(),
339        );
340
341        Ok(())
342    }
343
344    pub async fn from_frame(frame: &Frame, store: &Store) -> Result<Self, Error> {
345        let topic = frame
346            .topic
347            .strip_prefix("xs.actor.")
348            .and_then(|rest| rest.strip_suffix(".create"))
349            .ok_or("Frame topic must be xs.actor.<name>.create")?;
350
351        // Get hash and read expression
352        let hash = frame.hash.as_ref().ok_or("Missing hash field")?;
353        let mut reader = store
354            .cas_reader(hash.clone())
355            .await
356            .map_err(|e| format!("Failed to get cas reader: {e}"))?;
357
358        let mut expression = String::new();
359        reader
360            .read_to_string(&mut expression)
361            .await
362            .map_err(|e| format!("Failed to read expression: {e}"))?;
363
364        // Prepared base (Plain reads); the actor's per-instance buffered
365        // `.append` is added in Actor::new. Modules as of this frame.
366        let mut engine = nu::prepared_base(store, nu::ReadMode::Plain, false)?;
367        let modules = store.nu_modules_at(&frame.id);
368        nu::load_modules(&mut engine.state, store, &modules)?;
369
370        let actor = Actor::new(
371            frame.id,
372            topic.to_string(),
373            engine,
374            expression,
375            store.clone(),
376        )
377        .await?;
378
379        Ok(actor)
380    }
381
382    async fn configure_read_options(&self) -> ReadOptions {
383        // Determine after and new flag based on Start
384        let (after, is_new) = match &self.config.start {
385            Start::First => (None, false),
386            Start::New => (None, true),
387            Start::After(id) => (Some(*id), false),
388        };
389
390        // Configure follow option based on pulse setting
391        let follow_option = self
392            .config
393            .pulse
394            .map(|pulse| FollowOption::WithHeartbeat(Duration::from_millis(pulse)))
395            .unwrap_or(FollowOption::On);
396
397        ReadOptions::builder()
398            .follow(follow_option)
399            .new(is_new)
400            .maybe_after(after)
401            .build()
402    }
403}
404
405use tokio::sync::{mpsc, oneshot};
406
407pub struct EngineWorker {
408    work_tx: mpsc::Sender<WorkItem>,
409}
410
411struct WorkItem {
412    frame: Frame,
413    resp_tx: oneshot::Sender<Result<ClosureResult, Error>>,
414}
415
416impl EngineWorker {
417    pub fn new(
418        engine: nu::Engine,
419        closure: nu_protocol::engine::Closure,
420        initial_state: Value,
421    ) -> Self {
422        let (work_tx, mut work_rx) = mpsc::channel(32);
423
424        std::thread::spawn(move || {
425            let mut engine = engine;
426            let mut state = initial_state;
427
428            while let Some(WorkItem { frame, resp_tx }) = work_rx.blocking_recv() {
429                let frame_val =
430                    crate::nu::frame_to_value(&frame, nu_protocol::Span::unknown(), false);
431
432                let pipeline = engine.run_closure_in_job(
433                    &closure,
434                    vec![frame_val, state.clone()],
435                    None,
436                    format!("actor {topic}", topic = frame.topic),
437                );
438
439                let result = pipeline
440                    .map_err(|e| {
441                        let working_set = nu_protocol::engine::StateWorkingSet::new(&engine.state);
442                        Error::from(nu_protocol::format_cli_error(None, &working_set, &*e, None))
443                    })
444                    .and_then(|pd| {
445                        pd.into_value(nu_protocol::Span::unknown())
446                            .map_err(Error::from)
447                    })
448                    .and_then(interpret_closure_result);
449
450                if let Ok(ClosureResult::Continue { ref next_state, .. }) = result {
451                    state = next_state.clone();
452                }
453
454                let _ = resp_tx.send(result);
455            }
456        });
457
458        Self { work_tx }
459    }
460
461    pub async fn eval(&self, frame: Frame) -> Result<ClosureResult, Error> {
462        let (resp_tx, resp_rx) = oneshot::channel();
463        let work_item = WorkItem { frame, resp_tx };
464
465        self.work_tx
466            .send(work_item)
467            .await
468            .map_err(|_| Error::from("Engine worker thread has terminated"))?;
469
470        resp_rx
471            .await
472            .map_err(|_| Error::from("Engine worker thread has terminated"))?
473    }
474}
475
476fn interpret_closure_result(value: Value) -> Result<ClosureResult, Error> {
477    match value {
478        Value::Nothing { .. } => Ok(ClosureResult::Stop { output: None }),
479        Value::Record { ref val, .. } => {
480            for key in val.columns() {
481                if key != "out" && key != "next" {
482                    return Err(format!(
483                        "Unexpected key '{key}' in closure return record; only 'out' and 'next' are allowed"
484                    )
485                    .into());
486                }
487            }
488            let output = val.get("out").cloned();
489            match val.get("next").cloned() {
490                Some(next_state) => Ok(ClosureResult::Continue { output, next_state }),
491                None => Ok(ClosureResult::Stop { output }),
492            }
493        }
494        _ => Err(format!(
495            "Closure must return a record with 'out' and/or 'next' keys, or nothing; got {}",
496            value.get_type()
497        )
498        .into()),
499    }
500}
501
502fn is_value_an_append_frame_from_actor(value: &Value, actor_id: &Scru128Id) -> bool {
503    value
504        .as_record()
505        .ok()
506        .filter(|record| record.get("id").is_some() && record.get("topic").is_some())
507        .and_then(|record| record.get("meta"))
508        .and_then(|meta| meta.as_record().ok())
509        .and_then(|meta_record| meta_record.get("actor_id"))
510        .and_then(|id| id.as_str().ok())
511        .filter(|id| *id == actor_id.to_string())
512        .is_some()
513}
514
515/// Extract actor-specific configuration from the generic NuScriptConfig
516fn extract_actor_config(
517    script_config: &NuScriptConfig,
518) -> Result<(ActorConfig, Option<serde_json::Value>), Error> {
519    // Deserialize the actor script options using the deserialize_options method
520    let script_options: ActorScriptOptions = script_config.deserialize_options()?;
521
522    // Process start into the proper enum
523    let start =
524        match script_options.start.as_deref() {
525            Some("first") => Start::First,
526            Some("new") => Start::New,
527            Some(id_str) => Start::After(Scru128Id::from_str(id_str).map_err(|_| -> Error {
528                format!("Invalid scru128 ID for start: {id_str}").into()
529            })?),
530            None => Start::default(), // Default if not specified in script
531        };
532
533    // Build and return the ActorConfig and initial state
534    Ok((
535        ActorConfig {
536            start,
537            pulse: script_options.pulse,
538            return_options: script_options.return_options,
539        },
540        script_options.initial,
541    ))
542}