use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tracing::instrument;
use serde::{Deserialize, Serialize};
use tokio::io::AsyncReadExt;
use nu_protocol::Value;
use scru128::Scru128Id;
use crate::error::Error;
use crate::nu;
use crate::nu::commands;
use crate::nu::value_to_json;
use crate::nu::{NuScriptConfig, ReturnOptions};
use crate::store::{FollowOption, Frame, ReadOptions, Store};
#[derive(Clone)]
pub struct Actor {
pub id: Scru128Id,
pub topic: String,
config: ActorConfig,
engine_worker: Arc<EngineWorker>,
output: Arc<Mutex<Vec<Frame>>>,
}
#[derive(Clone, Debug)]
struct ActorConfig {
start: Start,
pulse: Option<u64>,
return_options: Option<ReturnOptions>,
}
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
enum Start {
First,
#[default]
New,
After(Scru128Id),
}
#[derive(Deserialize, Debug, Default)]
#[serde(default)] struct ActorScriptOptions {
start: Option<String>,
pulse: Option<u64>,
return_options: Option<ReturnOptions>,
initial: Option<serde_json::Value>,
}
pub(super) enum ClosureResult {
Continue {
output: Option<Value>,
next_state: Value,
},
Stop {
output: Option<Value>,
},
}
impl Actor {
pub async fn new(
id: Scru128Id,
topic: String,
mut engine: nu::Engine,
expression: String,
store: Store,
) -> Result<Self, Error> {
let output = Arc::new(Mutex::new(Vec::new()));
engine.add_commands(vec![
Box::new(commands::cat_command::CatCommand::new(store.clone())),
Box::new(commands::last_command::LastCommand::new(store.clone())),
Box::new(commands::append_command_buffered::AppendCommand::new(
store.clone(),
output.clone(),
)),
])?;
let nu_script_config = nu::parse_config(&mut engine, &expression)?;
let (actor_config, initial_json) = extract_actor_config(&nu_script_config)?;
let block = engine
.state
.get_block(nu_script_config.run_closure.block_id);
let num_required = block.signature.required_positional.len();
let num_optional = block.signature.optional_positional.len();
let total_positional = num_required + num_optional;
if total_positional != 2 {
return Err(format!(
"Closure must accept exactly 2 params (frame, state) -- got {total_positional}"
)
.into());
}
let span = nu_protocol::Span::unknown();
let initial_state = if let Some(json) = initial_json {
crate::nu::util::json_to_value(&json, span)
} else if num_optional > 0 {
let state_param = &block.signature.optional_positional[0];
state_param
.default_value
.clone()
.unwrap_or_else(|| Value::nothing(span))
} else {
Value::nothing(span)
};
let engine_worker = Arc::new(EngineWorker::new(
engine,
nu_script_config.run_closure,
initial_state,
));
Ok(Self {
id,
topic,
config: actor_config,
engine_worker,
output,
})
}
async fn eval_in_thread(&self, frame: &Frame) -> Result<ClosureResult, Error> {
self.engine_worker.eval(frame.clone()).await
}
#[instrument(
level = "info",
skip(self, frame, store),
fields(
message = %format!(
"actor={actor_id}:{topic} frame={frame_id}:{frame_topic}",
actor_id = self.id, topic = self.topic, frame_id = frame.id, frame_topic = frame.topic)
)
)]
async fn process_frame(&mut self, frame: &Frame, store: &Store) -> Result<bool, Error> {
let result = self.eval_in_thread(frame).await?;
let (output, should_continue) = match result {
ClosureResult::Continue { output, .. } => (output, true),
ClosureResult::Stop { output } => (output, false),
};
let additional_frame = match output {
Some(ref value)
if !is_value_an_append_frame_from_actor(value, &self.id)
&& !matches!(value, Value::Nothing { .. }) =>
{
let return_options = self.config.return_options.as_ref();
let suffix = return_options
.and_then(|ro| ro.suffix.as_deref())
.unwrap_or(".out");
let use_cas = return_options
.and_then(|ro| ro.target.as_deref())
.is_some_and(|t| t == "cas");
let topic = format!("{topic}{suffix}", topic = self.topic, suffix = suffix);
let ttl = return_options.and_then(|ro| ro.ttl.clone());
if use_cas {
let hash = match value {
Value::Binary { val, .. } => store.cas_insert(val).await?,
_ => store.cas_insert(&value_to_json(value).to_string()).await?,
};
Some(
Frame::builder(topic)
.maybe_ttl(ttl)
.maybe_hash(Some(hash))
.build(),
)
} else {
match value {
Value::Record { .. } => {
let json = value_to_json(value);
Some(Frame::builder(topic).maybe_ttl(ttl).meta(json).build())
}
_ => {
return Err(format!(
"Actor output must be a record when target is not \"cas\"; got {}. \
Set return_options.target to \"cas\" for non-record output.",
value.get_type()
)
.into());
}
}
}
}
_ => None,
};
let output_to_process: Vec<_> = {
let mut output = self.output.lock().unwrap();
output
.drain(..)
.chain(additional_frame.into_iter())
.collect()
};
for mut output_frame in output_to_process {
let meta_obj = output_frame
.meta
.get_or_insert_with(|| serde_json::Value::Object(Default::default()))
.as_object_mut()
.expect("meta should be an object");
meta_obj.insert(
"actor_id".to_string(),
serde_json::Value::String(self.id.to_string()),
);
meta_obj.insert(
"frame_id".to_string(),
serde_json::Value::String(frame.id.to_string()),
);
let _ = store.append(output_frame);
}
Ok(should_continue)
}
async fn serve(&mut self, store: &Store, options: ReadOptions) {
let mut recver = store.read(options).await;
while let Some(frame) = recver.recv().await {
if (frame.topic == format!("{topic}.register", topic = self.topic)
|| frame.topic == format!("{topic}.unregister", topic = self.topic))
&& frame.id <= self.id
{
continue;
}
if frame.topic == format!("{topic}.register", topic = &self.topic)
|| frame.topic == format!("{topic}.unregister", topic = &self.topic)
{
let _ = store.append(
Frame::builder(format!("{topic}.unregistered", topic = &self.topic))
.meta(serde_json::json!({
"actor_id": self.id.to_string(),
"frame_id": frame.id.to_string(),
}))
.build(),
);
break;
}
if frame
.meta
.as_ref()
.and_then(|meta| meta.get("actor_id"))
.and_then(|actor_id| actor_id.as_str())
.filter(|actor_id| *actor_id == self.id.to_string())
.is_some()
{
continue;
}
match self.process_frame(&frame, store).await {
Ok(true) => {}
Ok(false) => {
let _ = store.append(
Frame::builder(format!("{topic}.unregistered", topic = self.topic))
.meta(serde_json::json!({
"actor_id": self.id.to_string(),
"frame_id": frame.id.to_string(),
}))
.build(),
);
break;
}
Err(err) => {
let _ = store.append(
Frame::builder(format!("{topic}.unregistered", topic = self.topic))
.meta(serde_json::json!({
"actor_id": self.id.to_string(),
"frame_id": frame.id.to_string(),
"error": err.to_string(),
}))
.build(),
);
break;
}
}
}
}
pub async fn spawn(&self, store: Store) -> Result<(), Error> {
let options = self.configure_read_options().await;
{
let store = store.clone();
let options = options.clone();
let mut actor = self.clone();
tokio::spawn(async move {
actor.serve(&store, options).await;
});
}
let _ = store.append(
Frame::builder(format!("{topic}.active", topic = &self.topic))
.meta(serde_json::json!({
"actor_id": self.id.to_string(),
"start": self.config.start,
}))
.build(),
);
Ok(())
}
pub async fn from_frame(frame: &Frame, store: &Store) -> Result<Self, Error> {
let topic = frame
.topic
.strip_suffix(".register")
.ok_or("Frame topic must end with .register")?;
let hash = frame.hash.as_ref().ok_or("Missing hash field")?;
let mut reader = store
.cas_reader(hash.clone())
.await
.map_err(|e| format!("Failed to get cas reader: {e}"))?;
let mut expression = String::new();
reader
.read_to_string(&mut expression)
.await
.map_err(|e| format!("Failed to read expression: {e}"))?;
let engine = crate::processor::build_engine(store, &frame.id)?;
let actor = Actor::new(
frame.id,
topic.to_string(),
engine,
expression,
store.clone(),
)
.await?;
Ok(actor)
}
async fn configure_read_options(&self) -> ReadOptions {
let (after, is_new) = match &self.config.start {
Start::First => (None, false),
Start::New => (None, true),
Start::After(id) => (Some(*id), false),
};
let follow_option = self
.config
.pulse
.map(|pulse| FollowOption::WithHeartbeat(Duration::from_millis(pulse)))
.unwrap_or(FollowOption::On);
ReadOptions::builder()
.follow(follow_option)
.new(is_new)
.maybe_after(after)
.build()
}
}
use tokio::sync::{mpsc, oneshot};
pub struct EngineWorker {
work_tx: mpsc::Sender<WorkItem>,
}
struct WorkItem {
frame: Frame,
resp_tx: oneshot::Sender<Result<ClosureResult, Error>>,
}
impl EngineWorker {
pub fn new(
engine: nu::Engine,
closure: nu_protocol::engine::Closure,
initial_state: Value,
) -> Self {
let (work_tx, mut work_rx) = mpsc::channel(32);
std::thread::spawn(move || {
let mut engine = engine;
let mut state = initial_state;
while let Some(WorkItem { frame, resp_tx }) = work_rx.blocking_recv() {
let frame_val =
crate::nu::frame_to_value(&frame, nu_protocol::Span::unknown(), false);
let pipeline = engine.run_closure_in_job(
&closure,
vec![frame_val, state.clone()],
None,
format!("actor {topic}", topic = frame.topic),
);
let result = pipeline
.map_err(|e| {
let working_set = nu_protocol::engine::StateWorkingSet::new(&engine.state);
Error::from(nu_protocol::format_cli_error(None, &working_set, &*e, None))
})
.and_then(|pd| {
pd.into_value(nu_protocol::Span::unknown())
.map_err(Error::from)
})
.and_then(interpret_closure_result);
if let Ok(ClosureResult::Continue { ref next_state, .. }) = result {
state = next_state.clone();
}
let _ = resp_tx.send(result);
}
});
Self { work_tx }
}
pub async fn eval(&self, frame: Frame) -> Result<ClosureResult, Error> {
let (resp_tx, resp_rx) = oneshot::channel();
let work_item = WorkItem { frame, resp_tx };
self.work_tx
.send(work_item)
.await
.map_err(|_| Error::from("Engine worker thread has terminated"))?;
resp_rx
.await
.map_err(|_| Error::from("Engine worker thread has terminated"))?
}
}
fn interpret_closure_result(value: Value) -> Result<ClosureResult, Error> {
match value {
Value::Nothing { .. } => Ok(ClosureResult::Stop { output: None }),
Value::Record { ref val, .. } => {
for key in val.columns() {
if key != "out" && key != "next" {
return Err(format!(
"Unexpected key '{key}' in closure return record; only 'out' and 'next' are allowed"
)
.into());
}
}
let output = val.get("out").cloned();
match val.get("next").cloned() {
Some(next_state) => Ok(ClosureResult::Continue { output, next_state }),
None => Ok(ClosureResult::Stop { output }),
}
}
_ => Err(format!(
"Closure must return a record with 'out' and/or 'next' keys, or nothing; got {}",
value.get_type()
)
.into()),
}
}
fn is_value_an_append_frame_from_actor(value: &Value, actor_id: &Scru128Id) -> bool {
value
.as_record()
.ok()
.filter(|record| record.get("id").is_some() && record.get("topic").is_some())
.and_then(|record| record.get("meta"))
.and_then(|meta| meta.as_record().ok())
.and_then(|meta_record| meta_record.get("actor_id"))
.and_then(|id| id.as_str().ok())
.filter(|id| *id == actor_id.to_string())
.is_some()
}
fn extract_actor_config(
script_config: &NuScriptConfig,
) -> Result<(ActorConfig, Option<serde_json::Value>), Error> {
let script_options: ActorScriptOptions = script_config.deserialize_options()?;
let start =
match script_options.start.as_deref() {
Some("first") => Start::First,
Some("new") => Start::New,
Some(id_str) => Start::After(Scru128Id::from_str(id_str).map_err(|_| -> Error {
format!("Invalid scru128 ID for start: {id_str}").into()
})?),
None => Start::default(), };
Ok((
ActorConfig {
start,
pulse: script_options.pulse,
return_options: script_options.return_options,
},
script_options.initial,
))
}