Skip to main content

xs/processor/actor/
actor.rs

1use std::str::FromStr;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5use serde::{Deserialize, Serialize};
6
7use tokio::io::AsyncReadExt;
8
9use nu_protocol::Value;
10
11use scru128::Scru128Id;
12
13use crate::error::Error;
14use crate::nu;
15use crate::nu::value_to_json;
16use crate::nu::{NuScriptConfig, ReturnOptions};
17use crate::store::{FollowOption, Frame, ReadOptions, Store};
18
19#[derive(Clone)]
20pub struct Actor {
21    pub id: Scru128Id,
22    pub topic: String,
23    config: ActorConfig,
24    engine: nu::Engine,
25    closure: nu_protocol::engine::Closure,
26    initial_state: Value,
27    output: Arc<Mutex<Vec<Frame>>>,
28}
29
30#[derive(Clone, Debug)]
31struct ActorConfig {
32    start: Start,
33    pulse: Option<u64>,
34    return_options: Option<ReturnOptions>,
35    /// Optional topic filter, normalized to one pattern per element. Applied
36    /// at the read level (see `configure_read_options`): frames whose topic
37    /// matches none of these patterns never reach the actor loop. Each
38    /// pattern is an exact topic, a `prefix.*` wildcard, or `*`, the same
39    /// pattern language as `ReadOptions::topic`. Absent means all frames.
40    topics: Option<Vec<String>>,
41}
42
43#[derive(Clone, Debug, Default, Serialize, Deserialize)]
44#[serde(rename_all = "snake_case")]
45enum Start {
46    First,
47    #[default]
48    New,
49    After(Scru128Id),
50}
51
52/// Options that can be deserialized directly from a script.
53#[derive(Deserialize, Debug, Default)]
54#[serde(default)] // Use default values when fields are missing
55struct ActorScriptOptions {
56    /// Actor can specify where to start: "first", "new", or a specific ID
57    start: Option<String>,
58    /// Optional heartbeat interval in milliseconds
59    pulse: Option<u64>,
60    /// Optional customizations for return frames
61    return_options: Option<ReturnOptions>,
62    /// Initial state for the actor closure's second parameter
63    initial: Option<serde_json::Value>,
64    /// Optional list of topic patterns the actor cares about. Accepts a
65    /// nushell list of strings or a single comma-separated string.
66    topics: Option<TopicsSpec>,
67}
68
69/// A topics spec from the actor script: a single string (commas allowed) or
70/// a list of strings.
71#[derive(Deserialize, Debug)]
72#[serde(untagged)]
73enum TopicsSpec {
74    One(String),
75    Many(Vec<String>),
76}
77
78impl TopicsSpec {
79    /// Normalize to one pattern per element, splitting comma-separated
80    /// strings and dropping empty elements.
81    fn into_patterns(self) -> Vec<String> {
82        let parts: Vec<String> = match self {
83            TopicsSpec::One(s) => vec![s],
84            TopicsSpec::Many(v) => v,
85        };
86        parts
87            .iter()
88            .flat_map(|s| s.split(','))
89            .map(|p| p.trim().to_string())
90            .filter(|p| !p.is_empty())
91            .collect()
92    }
93}
94
95pub(super) enum ClosureResult {
96    Continue {
97        output: Option<Value>,
98        next_state: Value,
99    },
100    Stop {
101        output: Option<Value>,
102    },
103}
104
105impl Actor {
106    pub async fn new(
107        id: Scru128Id,
108        topic: String,
109        mut engine: nu::Engine,
110        expression: String,
111        store: Store,
112    ) -> Result<Self, Error> {
113        let output = Arc::new(Mutex::new(Vec::new()));
114        // Reads come from the prepared base; only the per-instance buffered
115        // `.append` (for atomic batch commit) is added here.
116        nu::add_write_commands(
117            &mut engine,
118            &store,
119            nu::AppendMode::Buffered(output.clone()),
120        )?;
121
122        // Parse configuration using the new generic API
123        let nu_script_config = nu::parse_config(&mut engine, &expression)?;
124
125        // Deserialize actor-specific options from the full_config_value
126        let (actor_config, initial_json) = extract_actor_config(&nu_script_config)?;
127
128        // Validate closure signature and resolve initial state
129        let block = engine
130            .state
131            .get_block(nu_script_config.run_closure.block_id);
132        let num_required = block.signature.required_positional.len();
133        let num_optional = block.signature.optional_positional.len();
134
135        let total_positional = num_required + num_optional;
136        if total_positional != 2 {
137            return Err(format!(
138                "Closure must accept exactly 2 params (frame, state), got {total_positional}"
139            )
140            .into());
141        }
142
143        // Resolve initial state: config `initial` > param default > null
144        let span = nu_protocol::Span::unknown();
145        let initial_state = if let Some(json) = initial_json {
146            crate::nu::util::json_to_value(&json, span)
147        } else if num_optional > 0 {
148            let state_param = &block.signature.optional_positional[0];
149            state_param
150                .default_value
151                .clone()
152                .unwrap_or_else(|| Value::nothing(span))
153        } else {
154            Value::nothing(span)
155        };
156
157        Ok(Self {
158            id,
159            topic,
160            config: actor_config,
161            engine,
162            closure: nu_script_config.run_closure,
163            initial_state,
164            output,
165        })
166    }
167
168    /// Evaluate the actor closure for one frame, inline on the calling
169    /// thread. The whole actor loop runs on a dedicated OS thread (see
170    /// `run_blocking`), so there is no async->worker->oneshot handoff per
171    /// frame -- that round trip (two context switches) was the dominant
172    /// per-frame cost in backlog replay. `state` is threaded by the caller.
173    fn eval_frame(&mut self, frame: &Frame, state: &Value) -> Result<ClosureResult, Error> {
174        let frame_val = crate::nu::frame_to_value(frame, nu_protocol::Span::unknown(), false);
175        self.engine
176            .eval_closure_no_job(&self.closure, vec![frame_val, state.clone()], None)
177            .map_err(|e| {
178                let working_set = nu_protocol::engine::StateWorkingSet::new(&self.engine.state);
179                Error::from(nu_protocol::format_cli_error(None, &working_set, &*e, None))
180            })
181            .and_then(|pd| {
182                pd.into_value(nu_protocol::Span::unknown())
183                    .map_err(Error::from)
184            })
185            .and_then(interpret_closure_result)
186    }
187
188    fn process_frame(
189        &mut self,
190        frame: &Frame,
191        store: &Store,
192        state: &mut Value,
193    ) -> Result<bool, Error> {
194        let result = self.eval_frame(frame, state)?;
195
196        let (output, should_continue) = match result {
197            ClosureResult::Continue {
198                output,
199                ref next_state,
200            } => {
201                *state = next_state.clone();
202                (output, true)
203            }
204            ClosureResult::Stop { output } => (output, false),
205        };
206
207        // Check if the evaluated value is an append frame
208        let additional_frame = match output {
209            Some(ref value)
210                if !is_value_an_append_frame_from_actor(value, &self.id)
211                    && !matches!(value, Value::Nothing { .. }) =>
212            {
213                let return_options = self.config.return_options.as_ref();
214                let suffix = return_options
215                    .and_then(|ro| ro.suffix.as_deref())
216                    .unwrap_or(".out");
217                let use_cas = return_options
218                    .and_then(|ro| ro.target.as_deref())
219                    .is_some_and(|t| t == "cas");
220
221                let topic = format!("{topic}{suffix}", topic = self.topic, suffix = suffix);
222                let ttl = return_options.and_then(|ro| ro.ttl.clone());
223
224                if use_cas {
225                    let hash = match value {
226                        Value::Binary { val, .. } => store.cas_insert_sync(val)?,
227                        _ => store.cas_insert_sync(value_to_json(value).to_string())?,
228                    };
229                    Some(
230                        Frame::builder(topic)
231                            .maybe_ttl(ttl)
232                            .maybe_hash(Some(hash))
233                            .build(),
234                    )
235                } else {
236                    // Default: records go to meta, non-records are an error
237                    match value {
238                        Value::Record { .. } => {
239                            let json = value_to_json(value);
240                            Some(Frame::builder(topic).maybe_ttl(ttl).meta(json).build())
241                        }
242                        _ => {
243                            return Err(format!(
244                                "Actor output must be a record when target is not \"cas\"; got {}. \
245                                 Set return_options.target to \"cas\" for non-record output.",
246                                value.get_type()
247                            )
248                            .into());
249                        }
250                    }
251                }
252            }
253            _ => None,
254        };
255
256        // Process buffered appends and the additional frame
257        let output_to_process: Vec<_> = {
258            let mut output = self.output.lock().unwrap();
259            output.drain(..).chain(additional_frame).collect()
260        };
261
262        for mut output_frame in output_to_process {
263            let meta_obj = output_frame
264                .meta
265                .get_or_insert_with(|| serde_json::Value::Object(Default::default()))
266                .as_object_mut()
267                .expect("meta should be an object");
268
269            meta_obj.insert(
270                "actor_id".to_string(),
271                serde_json::Value::String(self.id.to_string()),
272            );
273            meta_obj.insert(
274                "frame_id".to_string(),
275                serde_json::Value::String(frame.id.to_string()),
276            );
277
278            let _ = store.append(output_frame);
279        }
280
281        Ok(should_continue)
282    }
283
284    /// The actor's hot loop, run on a dedicated OS thread. Pulls frames with
285    /// blocking_recv (no async runtime hop) and evaluates each inline -- the
286    /// per-frame async->worker->oneshot round trip is gone. State is threaded
287    /// here and passed by `&mut` into `process_frame`. All store ops on this
288    /// path (`append`, `cas_insert_sync`) are synchronous.
289    fn run_blocking(mut self, mut recver: mpsc::Receiver<Frame>, store: Store) {
290        // One long-lived background job, so per-frame eval can skip job churn.
291        self.engine.attach_background_job("actor");
292        let mut state = self.initial_state.clone();
293
294        let create_topic = format!("xs.actor.{}.create", self.topic);
295        let term_topic = format!("xs.actor.{}.term", self.topic);
296        let store = &store;
297
298        while let Some(frame) = recver.blocking_recv() {
299            // Skip lifecycle activity that occurred before this actor was registered
300            if (frame.topic == create_topic || frame.topic == term_topic) && frame.id <= self.id {
301                continue;
302            }
303
304            // A newer .create wins: this actor steps aside. Emit .replaced
305            // (the successor's .active will be next).
306            if frame.topic == create_topic {
307                let _ = store.append(
308                    Frame::builder(format!("xs.actor.{}.replaced", &self.topic))
309                        .meta(serde_json::json!({
310                            "actor_id": self.id.to_string(),
311                            "frame_id": frame.id.to_string(),
312                        }))
313                        .build(),
314                );
315                break;
316            }
317
318            // User-requested stop.
319            if frame.topic == term_topic {
320                let _ = store.append(
321                    Frame::builder(format!("xs.actor.{}.fin.term", &self.topic))
322                        .meta(serde_json::json!({
323                            "actor_id": self.id.to_string(),
324                            "frame_id": frame.id.to_string(),
325                        }))
326                        .build(),
327                );
328                break;
329            }
330
331            // Skip frames that were generated by this actor
332            if frame
333                .meta
334                .as_ref()
335                .and_then(|meta| meta.get("actor_id"))
336                .and_then(|actor_id| actor_id.as_str())
337                .filter(|actor_id| *actor_id == self.id.to_string())
338                .is_some()
339            {
340                continue;
341            }
342
343            match self.process_frame(&frame, store, &mut state) {
344                Ok(true) => {}
345                Ok(false) => {
346                    // Actor self-terminated (natural completion).
347                    let _ = store.append(
348                        Frame::builder(format!("xs.actor.{}.fin.ok", &self.topic))
349                            .meta(serde_json::json!({
350                                "actor_id": self.id.to_string(),
351                                "frame_id": frame.id.to_string(),
352                            }))
353                            .build(),
354                    );
355                    break;
356                }
357                Err(err) => {
358                    // Runtime crash.
359                    let _ = store.append(
360                        Frame::builder(format!("xs.actor.{}.fin.error", &self.topic))
361                            .meta(serde_json::json!({
362                                "actor_id": self.id.to_string(),
363                                "frame_id": frame.id.to_string(),
364                                "error": err.to_string(),
365                            }))
366                            .build(),
367                    );
368                    break;
369                }
370            }
371        }
372    }
373
374    pub async fn spawn(self, store: Store) -> Result<(), Error> {
375        let options = self.configure_read_options().await;
376        // Set up the read stream on the async runtime, then run the actor's
377        // loop on a dedicated OS thread that drains it with blocking_recv.
378        // This keeps the synchronous nushell eval off the tokio runtime
379        // without paying a per-frame thread handoff.
380        let recver = store.read(options).await;
381
382        let _ = store.append(
383            Frame::builder(format!("xs.actor.{}.active", &self.topic))
384                .meta(serde_json::json!({
385                    "actor_id": self.id.to_string(),
386                    "start": self.config.start,
387                }))
388                .build(),
389        );
390
391        let store_for_loop = store.clone();
392        std::thread::Builder::new()
393            .name(format!("actor-{}", self.topic))
394            .spawn(move || {
395                self.run_blocking(recver, store_for_loop);
396            })
397            .map_err(|e| Error::from(format!("Failed to spawn actor thread: {e}")))?;
398
399        Ok(())
400    }
401
402    pub async fn from_frame(frame: &Frame, store: &Store) -> Result<Self, Error> {
403        let topic = frame
404            .topic
405            .strip_prefix("xs.actor.")
406            .and_then(|rest| rest.strip_suffix(".create"))
407            .ok_or("Frame topic must be xs.actor.<name>.create")?;
408
409        // Get hash and read expression
410        let hash = frame.hash.as_ref().ok_or("Missing hash field")?;
411        let mut reader = store
412            .cas_reader(hash.clone())
413            .await
414            .map_err(|e| format!("Failed to get cas reader: {e}"))?;
415
416        let mut expression = String::new();
417        reader
418            .read_to_string(&mut expression)
419            .await
420            .map_err(|e| format!("Failed to read expression: {e}"))?;
421
422        // Prepared base (Plain reads); the actor's per-instance buffered
423        // `.append` is added in Actor::new. Modules as of this frame.
424        let mut engine = nu::prepared_base(store, nu::ReadMode::Plain, false)?;
425        let modules = store.nu_modules_at(&frame.id);
426        nu::load_modules(&mut engine.state, store, &modules)?;
427
428        let actor = Actor::new(
429            frame.id,
430            topic.to_string(),
431            engine,
432            expression,
433            store.clone(),
434        )
435        .await?;
436
437        Ok(actor)
438    }
439
440    async fn configure_read_options(&self) -> ReadOptions {
441        // Determine after and new flag based on Start
442        let (after, is_new) = match &self.config.start {
443            Start::First => (None, false),
444            Start::New => (None, true),
445            Start::After(id) => (Some(*id), false),
446        };
447
448        // Configure follow option based on pulse setting
449        let follow_option = self
450            .config
451            .pulse
452            .map(|pulse| FollowOption::WithHeartbeat(Duration::from_millis(pulse)))
453            .unwrap_or(FollowOption::On);
454
455        // Apply the actor's topic filter at the read level: non-matching
456        // frames are skipped inside the store and never reach the actor
457        // loop. The actor's own lifecycle topics are always included so the
458        // loop still sees .create/.term frames; synthetic frames (the
459        // heartbeat xs.pulse when `pulse` is set, and xs.threshold) bypass
460        // read-level topic filtering entirely, so they are unaffected.
461        let topic = self.config.topics.as_ref().map(|patterns| {
462            let mut patterns = patterns.clone();
463            patterns.push(format!("xs.actor.{}.create", self.topic));
464            patterns.push(format!("xs.actor.{}.term", self.topic));
465            patterns.join(",")
466        });
467
468        ReadOptions::builder()
469            .follow(follow_option)
470            .new(is_new)
471            .maybe_after(after)
472            .maybe_topic(topic)
473            .build()
474    }
475}
476
477use tokio::sync::mpsc;
478
479fn interpret_closure_result(value: Value) -> Result<ClosureResult, Error> {
480    match value {
481        Value::Nothing { .. } => Ok(ClosureResult::Stop { output: None }),
482        Value::Record { ref val, .. } => {
483            for key in val.columns() {
484                if key != "out" && key != "next" {
485                    return Err(format!(
486                        "Unexpected key '{key}' in closure return record; only 'out' and 'next' are allowed"
487                    )
488                    .into());
489                }
490            }
491            let output = val.get("out").cloned();
492            match val.get("next").cloned() {
493                Some(next_state) => Ok(ClosureResult::Continue { output, next_state }),
494                None => Ok(ClosureResult::Stop { output }),
495            }
496        }
497        _ => Err(format!(
498            "Closure must return a record with 'out' and/or 'next' keys, or nothing; got {}",
499            value.get_type()
500        )
501        .into()),
502    }
503}
504
505fn is_value_an_append_frame_from_actor(value: &Value, actor_id: &Scru128Id) -> bool {
506    value
507        .as_record()
508        .ok()
509        .filter(|record| record.get("id").is_some() && record.get("topic").is_some())
510        .and_then(|record| record.get("meta"))
511        .and_then(|meta| meta.as_record().ok())
512        .and_then(|meta_record| meta_record.get("actor_id"))
513        .and_then(|id| id.as_str().ok())
514        .filter(|id| *id == actor_id.to_string())
515        .is_some()
516}
517
518/// Extract actor-specific configuration from the generic NuScriptConfig
519fn extract_actor_config(
520    script_config: &NuScriptConfig,
521) -> Result<(ActorConfig, Option<serde_json::Value>), Error> {
522    // Deserialize the actor script options using the deserialize_options method
523    let script_options: ActorScriptOptions = script_config.deserialize_options()?;
524
525    // Process start into the proper enum
526    let start =
527        match script_options.start.as_deref() {
528            Some("first") => Start::First,
529            Some("new") => Start::New,
530            Some(id_str) => Start::After(Scru128Id::from_str(id_str).map_err(|_| -> Error {
531                format!("Invalid scru128 ID for start: {id_str}").into()
532            })?),
533            None => Start::default(), // Default if not specified in script
534        };
535
536    // Build and return the ActorConfig and initial state
537    Ok((
538        ActorConfig {
539            start,
540            pulse: script_options.pulse,
541            return_options: script_options.return_options,
542            topics: script_options.topics.map(TopicsSpec::into_patterns),
543        },
544        script_options.initial,
545    ))
546}