Skip to main content

xs/processor/actor/
actor.rs

1use std::str::FromStr;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5use tracing::instrument;
6
7use serde::{Deserialize, Serialize};
8
9use tokio::io::AsyncReadExt;
10
11use nu_protocol::Value;
12
13use scru128::Scru128Id;
14
15use crate::error::Error;
16use crate::nu;
17use crate::nu::commands;
18use crate::nu::value_to_json;
19use crate::nu::{NuScriptConfig, ReturnOptions};
20use crate::store::{FollowOption, Frame, ReadOptions, Store};
21
22#[derive(Clone)]
23pub struct Actor {
24    pub id: Scru128Id,
25    pub topic: String,
26    config: ActorConfig,
27    engine_worker: Arc<EngineWorker>,
28    output: Arc<Mutex<Vec<Frame>>>,
29}
30
31#[derive(Clone, Debug)]
32struct ActorConfig {
33    start: Start,
34    pulse: Option<u64>,
35    return_options: Option<ReturnOptions>,
36}
37
38#[derive(Clone, Debug, Default, Serialize, Deserialize)]
39#[serde(rename_all = "snake_case")]
40enum Start {
41    First,
42    #[default]
43    New,
44    After(Scru128Id),
45}
46
47/// Options that can be deserialized directly from a script.
48#[derive(Deserialize, Debug, Default)]
49#[serde(default)] // Use default values when fields are missing
50struct ActorScriptOptions {
51    /// Actor can specify where to start: "first", "new", or a specific ID
52    start: Option<String>,
53    /// Optional heartbeat interval in milliseconds
54    pulse: Option<u64>,
55    /// Optional customizations for return frames
56    return_options: Option<ReturnOptions>,
57    /// Initial state for the actor closure's second parameter
58    initial: Option<serde_json::Value>,
59}
60
61pub(super) enum ClosureResult {
62    Continue {
63        output: Option<Value>,
64        next_state: Value,
65    },
66    Stop {
67        output: Option<Value>,
68    },
69}
70
71impl Actor {
72    pub async fn new(
73        id: Scru128Id,
74        topic: String,
75        mut engine: nu::Engine,
76        expression: String,
77        store: Store,
78    ) -> Result<Self, Error> {
79        let output = Arc::new(Mutex::new(Vec::new()));
80        engine.add_commands(vec![
81            Box::new(commands::cat_command::CatCommand::new(store.clone())),
82            Box::new(commands::last_command::LastCommand::new(store.clone())),
83            Box::new(commands::append_command_buffered::AppendCommand::new(
84                store.clone(),
85                output.clone(),
86            )),
87        ])?;
88
89        // Parse configuration using the new generic API
90        let nu_script_config = nu::parse_config(&mut engine, &expression)?;
91
92        // Deserialize actor-specific options from the full_config_value
93        let (actor_config, initial_json) = extract_actor_config(&nu_script_config)?;
94
95        // Validate closure signature and resolve initial state
96        let block = engine
97            .state
98            .get_block(nu_script_config.run_closure.block_id);
99        let num_required = block.signature.required_positional.len();
100        let num_optional = block.signature.optional_positional.len();
101
102        let total_positional = num_required + num_optional;
103        if total_positional != 2 {
104            return Err(format!(
105                "Closure must accept exactly 2 params (frame, state), got {total_positional}"
106            )
107            .into());
108        }
109
110        // Resolve initial state: config `initial` > param default > null
111        let span = nu_protocol::Span::unknown();
112        let initial_state = if let Some(json) = initial_json {
113            crate::nu::util::json_to_value(&json, span)
114        } else if num_optional > 0 {
115            let state_param = &block.signature.optional_positional[0];
116            state_param
117                .default_value
118                .clone()
119                .unwrap_or_else(|| Value::nothing(span))
120        } else {
121            Value::nothing(span)
122        };
123
124        let engine_worker = Arc::new(EngineWorker::new(
125            engine,
126            nu_script_config.run_closure,
127            initial_state,
128        ));
129
130        Ok(Self {
131            id,
132            topic,
133            config: actor_config,
134            engine_worker,
135            output,
136        })
137    }
138
139    async fn eval_in_thread(&self, frame: &Frame) -> Result<ClosureResult, Error> {
140        self.engine_worker.eval(frame.clone()).await
141    }
142
143    #[instrument(
144        level = "info",
145        skip(self, frame, store),
146        fields(
147            message = %format!(
148                "actor={actor_id}:{topic} frame={frame_id}:{frame_topic}",
149                actor_id = self.id, topic = self.topic, frame_id = frame.id, frame_topic = frame.topic)
150        )
151    )]
152    async fn process_frame(&mut self, frame: &Frame, store: &Store) -> Result<bool, Error> {
153        let result = self.eval_in_thread(frame).await?;
154
155        let (output, should_continue) = match result {
156            ClosureResult::Continue { output, .. } => (output, true),
157            ClosureResult::Stop { output } => (output, false),
158        };
159
160        // Check if the evaluated value is an append frame
161        let additional_frame = match output {
162            Some(ref value)
163                if !is_value_an_append_frame_from_actor(value, &self.id)
164                    && !matches!(value, Value::Nothing { .. }) =>
165            {
166                let return_options = self.config.return_options.as_ref();
167                let suffix = return_options
168                    .and_then(|ro| ro.suffix.as_deref())
169                    .unwrap_or(".out");
170                let use_cas = return_options
171                    .and_then(|ro| ro.target.as_deref())
172                    .is_some_and(|t| t == "cas");
173
174                let topic = format!("{topic}{suffix}", topic = self.topic, suffix = suffix);
175                let ttl = return_options.and_then(|ro| ro.ttl.clone());
176
177                if use_cas {
178                    let hash = match value {
179                        Value::Binary { val, .. } => store.cas_insert(val).await?,
180                        _ => store.cas_insert(&value_to_json(value).to_string()).await?,
181                    };
182                    Some(
183                        Frame::builder(topic)
184                            .maybe_ttl(ttl)
185                            .maybe_hash(Some(hash))
186                            .build(),
187                    )
188                } else {
189                    // Default: records go to meta, non-records are an error
190                    match value {
191                        Value::Record { .. } => {
192                            let json = value_to_json(value);
193                            Some(Frame::builder(topic).maybe_ttl(ttl).meta(json).build())
194                        }
195                        _ => {
196                            return Err(format!(
197                                "Actor output must be a record when target is not \"cas\"; got {}. \
198                                 Set return_options.target to \"cas\" for non-record output.",
199                                value.get_type()
200                            )
201                            .into());
202                        }
203                    }
204                }
205            }
206            _ => None,
207        };
208
209        // Process buffered appends and the additional frame
210        let output_to_process: Vec<_> = {
211            let mut output = self.output.lock().unwrap();
212            output
213                .drain(..)
214                .chain(additional_frame.into_iter())
215                .collect()
216        };
217
218        for mut output_frame in output_to_process {
219            let meta_obj = output_frame
220                .meta
221                .get_or_insert_with(|| serde_json::Value::Object(Default::default()))
222                .as_object_mut()
223                .expect("meta should be an object");
224
225            meta_obj.insert(
226                "actor_id".to_string(),
227                serde_json::Value::String(self.id.to_string()),
228            );
229            meta_obj.insert(
230                "frame_id".to_string(),
231                serde_json::Value::String(frame.id.to_string()),
232            );
233
234            let _ = store.append(output_frame);
235        }
236
237        Ok(should_continue)
238    }
239
240    async fn serve(&mut self, store: &Store, options: ReadOptions) {
241        let mut recver = store.read(options).await;
242        let create_topic = format!("xs.actor.{}.create", self.topic);
243        let term_topic = format!("xs.actor.{}.term", self.topic);
244
245        while let Some(frame) = recver.recv().await {
246            // Skip lifecycle activity that occurred before this actor was registered
247            if (frame.topic == create_topic || frame.topic == term_topic) && frame.id <= self.id {
248                continue;
249            }
250
251            // A newer .create wins: this actor steps aside. Emit .replaced
252            // (the successor's .active will be next).
253            if frame.topic == create_topic {
254                let _ = store.append(
255                    Frame::builder(format!("xs.actor.{}.replaced", &self.topic))
256                        .meta(serde_json::json!({
257                            "actor_id": self.id.to_string(),
258                            "frame_id": frame.id.to_string(),
259                        }))
260                        .build(),
261                );
262                break;
263            }
264
265            // User-requested stop.
266            if frame.topic == term_topic {
267                let _ = store.append(
268                    Frame::builder(format!("xs.actor.{}.fin.term", &self.topic))
269                        .meta(serde_json::json!({
270                            "actor_id": self.id.to_string(),
271                            "frame_id": frame.id.to_string(),
272                        }))
273                        .build(),
274                );
275                break;
276            }
277
278            // Skip frames that were generated by this actor
279            if frame
280                .meta
281                .as_ref()
282                .and_then(|meta| meta.get("actor_id"))
283                .and_then(|actor_id| actor_id.as_str())
284                .filter(|actor_id| *actor_id == self.id.to_string())
285                .is_some()
286            {
287                continue;
288            }
289
290            match self.process_frame(&frame, store).await {
291                Ok(true) => {}
292                Ok(false) => {
293                    // Actor self-terminated (natural completion).
294                    let _ = store.append(
295                        Frame::builder(format!("xs.actor.{}.fin.ok", &self.topic))
296                            .meta(serde_json::json!({
297                                "actor_id": self.id.to_string(),
298                                "frame_id": frame.id.to_string(),
299                            }))
300                            .build(),
301                    );
302                    break;
303                }
304                Err(err) => {
305                    // Runtime crash.
306                    let _ = store.append(
307                        Frame::builder(format!("xs.actor.{}.fin.error", &self.topic))
308                            .meta(serde_json::json!({
309                                "actor_id": self.id.to_string(),
310                                "frame_id": frame.id.to_string(),
311                                "error": err.to_string(),
312                            }))
313                            .build(),
314                    );
315                    break;
316                }
317            }
318        }
319    }
320
321    pub async fn spawn(&self, store: Store) -> Result<(), Error> {
322        let options = self.configure_read_options().await;
323
324        {
325            let store = store.clone();
326            let options = options.clone();
327            let mut actor = self.clone();
328
329            tokio::spawn(async move {
330                actor.serve(&store, options).await;
331            });
332        }
333
334        let _ = store.append(
335            Frame::builder(format!("xs.actor.{}.active", &self.topic))
336                .meta(serde_json::json!({
337                    "actor_id": self.id.to_string(),
338                    "start": self.config.start,
339                }))
340                .build(),
341        );
342
343        Ok(())
344    }
345
346    pub async fn from_frame(frame: &Frame, store: &Store) -> Result<Self, Error> {
347        let topic = frame
348            .topic
349            .strip_prefix("xs.actor.")
350            .and_then(|rest| rest.strip_suffix(".create"))
351            .ok_or("Frame topic must be xs.actor.<name>.create")?;
352
353        // Get hash and read expression
354        let hash = frame.hash.as_ref().ok_or("Missing hash field")?;
355        let mut reader = store
356            .cas_reader(hash.clone())
357            .await
358            .map_err(|e| format!("Failed to get cas reader: {e}"))?;
359
360        let mut expression = String::new();
361        reader
362            .read_to_string(&mut expression)
363            .await
364            .map_err(|e| format!("Failed to read expression: {e}"))?;
365
366        // Build engine from scratch with VFS modules at this point in the stream
367        let engine = crate::processor::build_engine(store, &frame.id)?;
368
369        let actor = Actor::new(
370            frame.id,
371            topic.to_string(),
372            engine,
373            expression,
374            store.clone(),
375        )
376        .await?;
377
378        Ok(actor)
379    }
380
381    async fn configure_read_options(&self) -> ReadOptions {
382        // Determine after and new flag based on Start
383        let (after, is_new) = match &self.config.start {
384            Start::First => (None, false),
385            Start::New => (None, true),
386            Start::After(id) => (Some(*id), false),
387        };
388
389        // Configure follow option based on pulse setting
390        let follow_option = self
391            .config
392            .pulse
393            .map(|pulse| FollowOption::WithHeartbeat(Duration::from_millis(pulse)))
394            .unwrap_or(FollowOption::On);
395
396        ReadOptions::builder()
397            .follow(follow_option)
398            .new(is_new)
399            .maybe_after(after)
400            .build()
401    }
402}
403
404use tokio::sync::{mpsc, oneshot};
405
406pub struct EngineWorker {
407    work_tx: mpsc::Sender<WorkItem>,
408}
409
410struct WorkItem {
411    frame: Frame,
412    resp_tx: oneshot::Sender<Result<ClosureResult, Error>>,
413}
414
415impl EngineWorker {
416    pub fn new(
417        engine: nu::Engine,
418        closure: nu_protocol::engine::Closure,
419        initial_state: Value,
420    ) -> Self {
421        let (work_tx, mut work_rx) = mpsc::channel(32);
422
423        std::thread::spawn(move || {
424            let mut engine = engine;
425            let mut state = initial_state;
426
427            while let Some(WorkItem { frame, resp_tx }) = work_rx.blocking_recv() {
428                let frame_val =
429                    crate::nu::frame_to_value(&frame, nu_protocol::Span::unknown(), false);
430
431                let pipeline = engine.run_closure_in_job(
432                    &closure,
433                    vec![frame_val, state.clone()],
434                    None,
435                    format!("actor {topic}", topic = frame.topic),
436                );
437
438                let result = pipeline
439                    .map_err(|e| {
440                        let working_set = nu_protocol::engine::StateWorkingSet::new(&engine.state);
441                        Error::from(nu_protocol::format_cli_error(None, &working_set, &*e, None))
442                    })
443                    .and_then(|pd| {
444                        pd.into_value(nu_protocol::Span::unknown())
445                            .map_err(Error::from)
446                    })
447                    .and_then(interpret_closure_result);
448
449                if let Ok(ClosureResult::Continue { ref next_state, .. }) = result {
450                    state = next_state.clone();
451                }
452
453                let _ = resp_tx.send(result);
454            }
455        });
456
457        Self { work_tx }
458    }
459
460    pub async fn eval(&self, frame: Frame) -> Result<ClosureResult, Error> {
461        let (resp_tx, resp_rx) = oneshot::channel();
462        let work_item = WorkItem { frame, resp_tx };
463
464        self.work_tx
465            .send(work_item)
466            .await
467            .map_err(|_| Error::from("Engine worker thread has terminated"))?;
468
469        resp_rx
470            .await
471            .map_err(|_| Error::from("Engine worker thread has terminated"))?
472    }
473}
474
475fn interpret_closure_result(value: Value) -> Result<ClosureResult, Error> {
476    match value {
477        Value::Nothing { .. } => Ok(ClosureResult::Stop { output: None }),
478        Value::Record { ref val, .. } => {
479            for key in val.columns() {
480                if key != "out" && key != "next" {
481                    return Err(format!(
482                        "Unexpected key '{key}' in closure return record; only 'out' and 'next' are allowed"
483                    )
484                    .into());
485                }
486            }
487            let output = val.get("out").cloned();
488            match val.get("next").cloned() {
489                Some(next_state) => Ok(ClosureResult::Continue { output, next_state }),
490                None => Ok(ClosureResult::Stop { output }),
491            }
492        }
493        _ => Err(format!(
494            "Closure must return a record with 'out' and/or 'next' keys, or nothing; got {}",
495            value.get_type()
496        )
497        .into()),
498    }
499}
500
501fn is_value_an_append_frame_from_actor(value: &Value, actor_id: &Scru128Id) -> bool {
502    value
503        .as_record()
504        .ok()
505        .filter(|record| record.get("id").is_some() && record.get("topic").is_some())
506        .and_then(|record| record.get("meta"))
507        .and_then(|meta| meta.as_record().ok())
508        .and_then(|meta_record| meta_record.get("actor_id"))
509        .and_then(|id| id.as_str().ok())
510        .filter(|id| *id == actor_id.to_string())
511        .is_some()
512}
513
514/// Extract actor-specific configuration from the generic NuScriptConfig
515fn extract_actor_config(
516    script_config: &NuScriptConfig,
517) -> Result<(ActorConfig, Option<serde_json::Value>), Error> {
518    // Deserialize the actor script options using the deserialize_options method
519    let script_options: ActorScriptOptions = script_config.deserialize_options()?;
520
521    // Process start into the proper enum
522    let start =
523        match script_options.start.as_deref() {
524            Some("first") => Start::First,
525            Some("new") => Start::New,
526            Some(id_str) => Start::After(Scru128Id::from_str(id_str).map_err(|_| -> Error {
527                format!("Invalid scru128 ID for start: {id_str}").into()
528            })?),
529            None => Start::default(), // Default if not specified in script
530        };
531
532    // Build and return the ActorConfig and initial state
533    Ok((
534        ActorConfig {
535            start,
536            pulse: script_options.pulse,
537            return_options: script_options.return_options,
538        },
539        script_options.initial,
540    ))
541}