xs/handlers/
handler.rs

1use std::str::FromStr;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5use tracing::instrument;
6
7use serde::{Deserialize, Serialize};
8
9use tokio::io::AsyncReadExt;
10
11use nu_protocol::Value;
12
13use scru128::Scru128Id;
14
15use crate::error::Error;
16use crate::nu;
17use crate::nu::commands;
18use crate::nu::value_to_json;
19use crate::nu::{NuScriptConfig, ReturnOptions};
20use crate::store::{FollowOption, Frame, ReadOptions, Store};
21
22#[derive(Clone)]
23pub struct Handler {
24    pub id: Scru128Id,
25    pub topic: String,
26    config: HandlerConfig,
27    engine_worker: Arc<EngineWorker>,
28    output: Arc<Mutex<Vec<Frame>>>,
29}
30
31#[derive(Clone, Debug)]
32struct HandlerConfig {
33    start: Start,
34    pulse: Option<u64>,
35    return_options: Option<ReturnOptions>,
36}
37
38#[derive(Clone, Debug, Default, Serialize, Deserialize)]
39#[serde(rename_all = "snake_case")]
40enum Start {
41    First,
42    #[default]
43    New,
44    After(Scru128Id),
45}
46
47/// Options that can be deserialized directly from a script.
48#[derive(Deserialize, Debug, Default)]
49#[serde(default)] // Use default values when fields are missing
50struct HandlerScriptOptions {
51    /// Handler can specify where to start: "first", "new", or a specific ID
52    start: Option<String>,
53    /// Optional heartbeat interval in milliseconds
54    pulse: Option<u64>,
55    /// Optional customizations for return frames
56    return_options: Option<ReturnOptions>,
57}
58
59impl Handler {
60    pub async fn new(
61        id: Scru128Id,
62        topic: String,
63        mut engine: nu::Engine,
64        expression: String,
65        store: Store,
66    ) -> Result<Self, Error> {
67        let output = Arc::new(Mutex::new(Vec::new()));
68        engine.add_commands(vec![
69            Box::new(commands::cat_command::CatCommand::new(store.clone())),
70            Box::new(commands::last_command::LastCommand::new(store.clone())),
71            Box::new(commands::append_command_buffered::AppendCommand::new(
72                store.clone(),
73                output.clone(),
74            )),
75        ])?;
76
77        // Parse configuration using the new generic API
78        let nu_script_config = nu::parse_config(&mut engine, &expression)?;
79
80        // Deserialize handler-specific options from the full_config_value
81        let handler_config = extract_handler_config(&nu_script_config)?;
82
83        // Validate the closure signature
84        let block = engine
85            .state
86            .get_block(nu_script_config.run_closure.block_id);
87        if block.signature.required_positional.len() != 1 {
88            return Err(format!(
89                "Closure must accept exactly one frame argument, found {count}",
90                count = block.signature.required_positional.len()
91            )
92            .into());
93        }
94
95        let engine_worker = Arc::new(EngineWorker::new(engine, nu_script_config.run_closure));
96
97        Ok(Self {
98            id,
99            topic,
100            config: handler_config,
101            engine_worker,
102            output,
103        })
104    }
105
106    pub async fn eval_in_thread(&self, frame: &crate::store::Frame) -> Result<Value, Error> {
107        self.engine_worker.eval(frame.clone()).await
108    }
109
110    #[instrument(
111        level = "info",
112        skip(self, frame, store),
113        fields(
114            message = %format!(
115                "handler={handler_id}:{topic} frame={frame_id}:{frame_topic}",
116                handler_id = self.id, topic = self.topic, frame_id = frame.id, frame_topic = frame.topic)
117        )
118    )]
119    async fn process_frame(&mut self, frame: &Frame, store: &Store) -> Result<(), Error> {
120        let frame_clone = frame.clone();
121
122        let value = self.eval_in_thread(&frame_clone).await?;
123
124        // Check if the evaluated value is an append frame
125        let additional_frame = if !is_value_an_append_frame_from_handler(&value, &self.id)
126            && !matches!(value, Value::Nothing { .. })
127        {
128            let return_options = self.config.return_options.as_ref();
129            let suffix = return_options
130                .and_then(|ro| ro.suffix.as_deref())
131                .unwrap_or(".out");
132
133            let hash = match &value {
134                Value::Binary { val, .. } => {
135                    // Store binary data directly
136                    store.cas_insert(val).await?
137                }
138                _ => {
139                    // Store as JSON string (existing path)
140                    store.cas_insert(&value_to_json(&value).to_string()).await?
141                }
142            };
143            Some(
144                Frame::builder(format!(
145                    "{topic}{suffix}",
146                    topic = self.topic,
147                    suffix = suffix
148                ))
149                .maybe_ttl(return_options.and_then(|ro| ro.ttl.clone()))
150                .maybe_hash(Some(hash))
151                .build(),
152            )
153        } else {
154            None
155        };
156
157        // Process buffered appends and the additional frame
158        let output_to_process: Vec<_> = {
159            let mut output = self.output.lock().unwrap();
160            output
161                .drain(..)
162                .chain(additional_frame.into_iter())
163                .collect()
164        };
165
166        for mut output_frame in output_to_process {
167            let meta_obj = output_frame
168                .meta
169                .get_or_insert_with(|| serde_json::Value::Object(Default::default()))
170                .as_object_mut()
171                .expect("meta should be an object");
172
173            meta_obj.insert(
174                "handler_id".to_string(),
175                serde_json::Value::String(self.id.to_string()),
176            );
177            meta_obj.insert(
178                "frame_id".to_string(),
179                serde_json::Value::String(frame.id.to_string()),
180            );
181
182            let _ = store.append(output_frame);
183        }
184
185        Ok(())
186    }
187
188    async fn serve(&mut self, store: &Store, options: ReadOptions) {
189        let mut recver = store.read(options).await;
190
191        while let Some(frame) = recver.recv().await {
192            // Skip registration activity that occurred before this handler was registered
193            if (frame.topic == format!("{topic}.register", topic = self.topic)
194                || frame.topic == format!("{topic}.unregister", topic = self.topic))
195                && frame.id <= self.id
196            {
197                continue;
198            }
199
200            if frame.topic == format!("{topic}.register", topic = &self.topic)
201                || frame.topic == format!("{topic}.unregister", topic = &self.topic)
202            {
203                let _ = store.append(
204                    Frame::builder(format!("{topic}.unregistered", topic = &self.topic))
205                        .meta(serde_json::json!({
206                            "handler_id": self.id.to_string(),
207                            "frame_id": frame.id.to_string(),
208                        }))
209                        .build(),
210                );
211                break;
212            }
213
214            // Skip frames that were generated by this handler
215            if frame
216                .meta
217                .as_ref()
218                .and_then(|meta| meta.get("handler_id"))
219                .and_then(|handler_id| handler_id.as_str())
220                .filter(|handler_id| *handler_id == self.id.to_string())
221                .is_some()
222            {
223                continue;
224            }
225
226            if let Err(err) = self.process_frame(&frame, store).await {
227                let _ = store.append(
228                    Frame::builder(format!("{topic}.unregistered", topic = self.topic))
229                        .meta(serde_json::json!({
230                            "handler_id": self.id.to_string(),
231                            "frame_id": frame.id.to_string(),
232                            "error": err.to_string(),
233                        }))
234                        .build(),
235                );
236                break;
237            }
238        }
239    }
240
241    pub async fn spawn(&self, store: Store) -> Result<(), Error> {
242        let options = self.configure_read_options().await;
243
244        {
245            let store = store.clone();
246            let options = options.clone();
247            let mut handler = self.clone();
248
249            tokio::spawn(async move {
250                handler.serve(&store, options).await;
251            });
252        }
253
254        let _ = store.append(
255            Frame::builder(format!("{topic}.active", topic = &self.topic))
256                .meta(serde_json::json!({
257                    "handler_id": self.id.to_string(),
258                    "new": options.new,
259                    "after": options.after.map(|id| id.to_string()),
260                }))
261                .build(),
262        );
263
264        Ok(())
265    }
266
267    pub async fn from_frame(
268        frame: &Frame,
269        store: &Store,
270        engine: nu::Engine,
271    ) -> Result<Self, Error> {
272        let topic = frame
273            .topic
274            .strip_suffix(".register")
275            .ok_or("Frame topic must end with .register")?;
276
277        // Get hash and read expression
278        let hash = frame.hash.as_ref().ok_or("Missing hash field")?;
279        let mut reader = store
280            .cas_reader(hash.clone())
281            .await
282            .map_err(|e| format!("Failed to get cas reader: {e}"))?;
283
284        let mut expression = String::new();
285        reader
286            .read_to_string(&mut expression)
287            .await
288            .map_err(|e| format!("Failed to read expression: {e}"))?;
289
290        let handler = Handler::new(
291            frame.id,
292            topic.to_string(),
293            engine,
294            expression,
295            store.clone(),
296        )
297        .await?;
298
299        Ok(handler)
300    }
301
302    async fn configure_read_options(&self) -> ReadOptions {
303        // Determine after and new flag based on Start
304        let (after, is_new) = match &self.config.start {
305            Start::First => (None, false),
306            Start::New => (None, true),
307            Start::After(id) => (Some(*id), false),
308        };
309
310        // Configure follow option based on pulse setting
311        let follow_option = self
312            .config
313            .pulse
314            .map(|pulse| FollowOption::WithHeartbeat(Duration::from_millis(pulse)))
315            .unwrap_or(FollowOption::On);
316
317        ReadOptions::builder()
318            .follow(follow_option)
319            .new(is_new)
320            .maybe_after(after)
321            .build()
322    }
323}
324
325use tokio::sync::{mpsc, oneshot};
326
327pub struct EngineWorker {
328    work_tx: mpsc::Sender<WorkItem>,
329}
330
331struct WorkItem {
332    frame: Frame,
333    resp_tx: oneshot::Sender<Result<Value, Error>>,
334}
335
336impl EngineWorker {
337    pub fn new(engine: nu::Engine, closure: nu_protocol::engine::Closure) -> Self {
338        let (work_tx, mut work_rx) = mpsc::channel(32);
339
340        std::thread::spawn(move || {
341            let mut engine = engine;
342
343            while let Some(WorkItem { frame, resp_tx }) = work_rx.blocking_recv() {
344                let arg_val = crate::nu::frame_to_value(&frame, nu_protocol::Span::unknown());
345
346                let pipeline = engine.run_closure_in_job(
347                    &closure,
348                    Some(arg_val), // The frame value for the closure's argument
349                    None,          // No separate $in pipeline
350                    format!("handler {topic}", topic = frame.topic),
351                );
352
353                let output = pipeline
354                    .map_err(|e| {
355                        let working_set = nu_protocol::engine::StateWorkingSet::new(&engine.state);
356                        Error::from(nu_protocol::format_cli_error(None, &working_set, &*e, None))
357                    })
358                    .and_then(|pd| {
359                        pd.into_value(nu_protocol::Span::unknown())
360                            .map_err(Error::from)
361                    });
362
363                let _ = resp_tx.send(output);
364            }
365        });
366
367        Self { work_tx }
368    }
369
370    pub async fn eval(&self, frame: Frame) -> Result<Value, Error> {
371        let (resp_tx, resp_rx) = oneshot::channel();
372        let work_item = WorkItem { frame, resp_tx };
373
374        self.work_tx
375            .send(work_item)
376            .await
377            .map_err(|_| Error::from("Engine worker thread has terminated"))?;
378
379        resp_rx
380            .await
381            .map_err(|_| Error::from("Engine worker thread has terminated"))?
382    }
383}
384
385fn is_value_an_append_frame_from_handler(value: &Value, handler_id: &Scru128Id) -> bool {
386    value
387        .as_record()
388        .ok()
389        .filter(|record| record.get("id").is_some() && record.get("topic").is_some())
390        .and_then(|record| record.get("meta"))
391        .and_then(|meta| meta.as_record().ok())
392        .and_then(|meta_record| meta_record.get("handler_id"))
393        .and_then(|id| id.as_str().ok())
394        .filter(|id| *id == handler_id.to_string())
395        .is_some()
396}
397
398/// Extract handler-specific configuration from the generic NuScriptConfig
399fn extract_handler_config(script_config: &NuScriptConfig) -> Result<HandlerConfig, Error> {
400    // Deserialize the handler script options using the new deserialize_options method
401    let script_options: HandlerScriptOptions = script_config.deserialize_options()?;
402
403    // Process start into the proper enum
404    let start =
405        match script_options.start.as_deref() {
406            Some("first") => Start::First,
407            Some("new") => Start::New,
408            Some(id_str) => Start::After(Scru128Id::from_str(id_str).map_err(|_| -> Error {
409                format!("Invalid scru128 ID for start: {id_str}").into()
410            })?),
411            None => Start::default(), // Default if not specified in script
412        };
413
414    // Build and return the HandlerConfig
415    Ok(HandlerConfig {
416        start,
417        pulse: script_options.pulse,
418        return_options: script_options.return_options,
419    })
420}