xs/handlers/
handler.rs

1use std::str::FromStr;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5use tracing::instrument;
6
7use serde::{Deserialize, Serialize};
8
9use tokio::io::AsyncReadExt;
10
11use nu_protocol::Value;
12
13use scru128::Scru128Id;
14
15use crate::error::Error;
16use crate::nu;
17use crate::nu::commands;
18use crate::nu::value_to_json;
19use crate::nu::{NuScriptConfig, ReturnOptions};
20use crate::store::{FollowOption, Frame, ReadOptions, Store};
21
22#[derive(Clone)]
23pub struct Handler {
24    pub id: Scru128Id,
25    pub context_id: Scru128Id,
26    pub topic: String,
27    config: HandlerConfig,
28    engine_worker: Arc<EngineWorker>,
29    output: Arc<Mutex<Vec<Frame>>>,
30}
31
32#[derive(Clone, Debug)]
33struct HandlerConfig {
34    resume_from: ResumeFrom,
35    pulse: Option<u64>,
36    return_options: Option<ReturnOptions>,
37}
38
39#[derive(Clone, Debug, Serialize, Deserialize)]
40#[serde(rename_all = "snake_case")]
41enum ResumeFrom {
42    Head,
43    Tail,
44    After(Scru128Id),
45}
46
47impl Default for ResumeFrom {
48    fn default() -> Self {
49        Self::Tail
50    }
51}
52
53/// Options that can be deserialized directly from a script.
54#[derive(Deserialize, Debug, Default)]
55#[serde(default)] // Use default values when fields are missing
56struct HandlerScriptOptions {
57    /// Handler can specify where to resume from: "head", "tail", or a specific ID
58    resume_from: Option<String>,
59    /// Optional heartbeat interval in milliseconds
60    pulse: Option<u64>,
61    /// Optional customizations for return frames
62    return_options: Option<ReturnOptions>,
63}
64
65impl Handler {
66    pub async fn new(
67        id: Scru128Id,
68        context_id: Scru128Id,
69        topic: String,
70        mut engine: nu::Engine,
71        expression: String,
72        store: Store,
73    ) -> Result<Self, Error> {
74        let output = Arc::new(Mutex::new(Vec::new()));
75        engine.add_commands(vec![
76            Box::new(commands::cat_command::CatCommand::new(
77                store.clone(),
78                context_id,
79            )),
80            Box::new(commands::head_command::HeadCommand::new(
81                store.clone(),
82                context_id,
83            )),
84            Box::new(commands::append_command_buffered::AppendCommand::new(
85                store.clone(),
86                output.clone(),
87            )),
88        ])?;
89
90        // Parse configuration using the new generic API
91        let nu_script_config = nu::parse_config(&mut engine, &expression)?;
92
93        // Deserialize handler-specific options from the full_config_value
94        let handler_config = extract_handler_config(&nu_script_config)?;
95
96        // Validate the closure signature
97        let block = engine
98            .state
99            .get_block(nu_script_config.run_closure.block_id);
100        if block.signature.required_positional.len() != 1 {
101            return Err(format!(
102                "Closure must accept exactly one frame argument, found {count}",
103                count = block.signature.required_positional.len()
104            )
105            .into());
106        }
107
108        let engine_worker = Arc::new(EngineWorker::new(engine, nu_script_config.run_closure));
109
110        Ok(Self {
111            id,
112            context_id,
113            topic,
114            config: handler_config,
115            engine_worker,
116            output,
117        })
118    }
119
120    pub async fn eval_in_thread(&self, frame: &crate::store::Frame) -> Result<Value, Error> {
121        self.engine_worker.eval(frame.clone()).await
122    }
123
124    #[instrument(
125        level = "info",
126        skip(self, frame, store),
127        fields(
128            message = %format!(
129                "handler={handler_id}:{topic} frame={frame_id}:{frame_topic}",
130                handler_id = self.id, topic = self.topic, frame_id = frame.id, frame_topic = frame.topic)
131        )
132    )]
133    async fn process_frame(&mut self, frame: &Frame, store: &Store) -> Result<(), Error> {
134        let frame_clone = frame.clone();
135
136        let value = self.eval_in_thread(&frame_clone).await?;
137
138        // Check if the evaluated value is an append frame
139        let additional_frame = if !is_value_an_append_frame_from_handler(&value, &self.id)
140            && !matches!(value, Value::Nothing { .. })
141        {
142            let return_options = self.config.return_options.as_ref();
143            let suffix = return_options
144                .and_then(|ro| ro.suffix.as_deref())
145                .unwrap_or(".out");
146
147            let hash = match &value {
148                Value::Binary { val, .. } => {
149                    // Store binary data directly
150                    store.cas_insert(val).await?
151                }
152                _ => {
153                    // Store as JSON string (existing path)
154                    store.cas_insert(&value_to_json(&value).to_string()).await?
155                }
156            };
157            Some(
158                Frame::builder(
159                    format!("{topic}{suffix}", topic = self.topic, suffix = suffix),
160                    self.context_id,
161                )
162                .maybe_ttl(return_options.and_then(|ro| ro.ttl.clone()))
163                .maybe_hash(Some(hash))
164                .build(),
165            )
166        } else {
167            None
168        };
169
170        // Process buffered appends and the additional frame
171        let output_to_process: Vec<_> = {
172            let mut output = self.output.lock().unwrap();
173            output
174                .drain(..)
175                .chain(additional_frame.into_iter())
176                .collect()
177        };
178
179        for mut output_frame in output_to_process {
180            let meta_obj = output_frame
181                .meta
182                .get_or_insert_with(|| serde_json::Value::Object(Default::default()))
183                .as_object_mut()
184                .expect("meta should be an object");
185
186            meta_obj.insert(
187                "handler_id".to_string(),
188                serde_json::Value::String(self.id.to_string()),
189            );
190            meta_obj.insert(
191                "frame_id".to_string(),
192                serde_json::Value::String(frame.id.to_string()),
193            );
194
195            // scope the handler's output to the handler's context
196            output_frame.context_id = self.context_id;
197            let _ = store.append(output_frame);
198        }
199
200        Ok(())
201    }
202
203    async fn serve(&mut self, store: &Store, options: ReadOptions) {
204        let mut recver = store.read(options).await;
205
206        while let Some(frame) = recver.recv().await {
207            // Skip registration activity that occurred before this handler was registered
208            if (frame.topic == format!("{topic}.register", topic = self.topic)
209                || frame.topic == format!("{topic}.unregister", topic = self.topic))
210                && frame.id <= self.id
211            {
212                continue;
213            }
214
215            if frame.topic == format!("{topic}.register", topic = &self.topic)
216                || frame.topic == format!("{topic}.unregister", topic = &self.topic)
217            {
218                let _ = store.append(
219                    Frame::builder(
220                        format!("{topic}.unregistered", topic = &self.topic),
221                        self.context_id,
222                    )
223                    .meta(serde_json::json!({
224                        "handler_id": self.id.to_string(),
225                        "frame_id": frame.id.to_string(),
226                    }))
227                    .build(),
228                );
229                break;
230            }
231
232            // Skip frames that were generated by this handler
233            if frame
234                .meta
235                .as_ref()
236                .and_then(|meta| meta.get("handler_id"))
237                .and_then(|handler_id| handler_id.as_str())
238                .filter(|handler_id| *handler_id == self.id.to_string())
239                .is_some()
240            {
241                continue;
242            }
243
244            if let Err(err) = self.process_frame(&frame, store).await {
245                let _ = store.append(
246                    Frame::builder(
247                        format!("{topic}.unregistered", topic = self.topic),
248                        self.context_id,
249                    )
250                    .meta(serde_json::json!({
251                        "handler_id": self.id.to_string(),
252                        "frame_id": frame.id.to_string(),
253                        "error": err.to_string(),
254                    }))
255                    .build(),
256                );
257                break;
258            }
259        }
260    }
261
262    pub async fn spawn(&self, store: Store) -> Result<(), Error> {
263        let options = self.configure_read_options().await;
264
265        {
266            let store = store.clone();
267            let options = options.clone();
268            let mut handler = self.clone();
269
270            tokio::spawn(async move {
271                handler.serve(&store, options).await;
272            });
273        }
274
275        let _ = store.append(
276            Frame::builder(
277                format!("{topic}.active", topic = &self.topic),
278                self.context_id,
279            )
280            .meta(serde_json::json!({
281                "handler_id": self.id.to_string(),
282                "tail": options.tail,
283                "last_id": options.last_id.map(|id| id.to_string()),
284            }))
285            .build(),
286        );
287
288        Ok(())
289    }
290
291    pub async fn from_frame(
292        frame: &Frame,
293        store: &Store,
294        engine: nu::Engine,
295    ) -> Result<Self, Error> {
296        let topic = frame
297            .topic
298            .strip_suffix(".register")
299            .ok_or("Frame topic must end with .register")?;
300
301        // Get hash and read expression
302        let hash = frame.hash.as_ref().ok_or("Missing hash field")?;
303        let mut reader = store
304            .cas_reader(hash.clone())
305            .await
306            .map_err(|e| format!("Failed to get cas reader: {e}"))?;
307
308        let mut expression = String::new();
309        reader
310            .read_to_string(&mut expression)
311            .await
312            .map_err(|e| format!("Failed to read expression: {e}"))?;
313
314        let handler = Handler::new(
315            frame.id,
316            frame.context_id,
317            topic.to_string(),
318            engine,
319            expression,
320            store.clone(),
321        )
322        .await?;
323
324        Ok(handler)
325    }
326
327    async fn configure_read_options(&self) -> ReadOptions {
328        // Determine last_id and tail flag based on ResumeFrom
329        let (last_id, is_tail) = match &self.config.resume_from {
330            ResumeFrom::Head => (None, false),
331            ResumeFrom::Tail => (None, true),
332            ResumeFrom::After(id) => (Some(*id), false),
333        };
334
335        // Configure follow option based on pulse setting
336        let follow_option = self
337            .config
338            .pulse
339            .map(|pulse| FollowOption::WithHeartbeat(Duration::from_millis(pulse)))
340            .unwrap_or(FollowOption::On);
341
342        ReadOptions::builder()
343            .follow(follow_option)
344            .tail(is_tail)
345            .maybe_last_id(last_id)
346            .context_id(self.context_id)
347            .build()
348    }
349}
350
351use tokio::sync::{mpsc, oneshot};
352
353pub struct EngineWorker {
354    work_tx: mpsc::Sender<WorkItem>,
355}
356
357struct WorkItem {
358    frame: Frame,
359    resp_tx: oneshot::Sender<Result<Value, Error>>,
360}
361
362impl EngineWorker {
363    pub fn new(engine: nu::Engine, closure: nu_protocol::engine::Closure) -> Self {
364        let (work_tx, mut work_rx) = mpsc::channel(32);
365
366        std::thread::spawn(move || {
367            let mut engine = engine;
368
369            while let Some(WorkItem { frame, resp_tx }) = work_rx.blocking_recv() {
370                let arg_val = crate::nu::frame_to_value(&frame, nu_protocol::Span::unknown());
371
372                let pipeline = engine.run_closure_in_job(
373                    &closure,
374                    Some(arg_val), // The frame value for the closure's argument
375                    None,          // No separate $in pipeline
376                    format!("handler {topic}", topic = frame.topic),
377                );
378
379                let output = pipeline
380                    .map_err(|e| {
381                        let working_set = nu_protocol::engine::StateWorkingSet::new(&engine.state);
382                        Error::from(nu_protocol::format_cli_error(&working_set, &*e, None))
383                    })
384                    .and_then(|pd| {
385                        pd.into_value(nu_protocol::Span::unknown())
386                            .map_err(Error::from)
387                    });
388
389                let _ = resp_tx.send(output);
390            }
391        });
392
393        Self { work_tx }
394    }
395
396    pub async fn eval(&self, frame: Frame) -> Result<Value, Error> {
397        let (resp_tx, resp_rx) = oneshot::channel();
398        let work_item = WorkItem { frame, resp_tx };
399
400        self.work_tx
401            .send(work_item)
402            .await
403            .map_err(|_| Error::from("Engine worker thread has terminated"))?;
404
405        resp_rx
406            .await
407            .map_err(|_| Error::from("Engine worker thread has terminated"))?
408    }
409}
410
411fn is_value_an_append_frame_from_handler(value: &Value, handler_id: &Scru128Id) -> bool {
412    value
413        .as_record()
414        .ok()
415        .filter(|record| record.get("id").is_some() && record.get("topic").is_some())
416        .and_then(|record| record.get("meta"))
417        .and_then(|meta| meta.as_record().ok())
418        .and_then(|meta_record| meta_record.get("handler_id"))
419        .and_then(|id| id.as_str().ok())
420        .filter(|id| *id == handler_id.to_string())
421        .is_some()
422}
423
424/// Extract handler-specific configuration from the generic NuScriptConfig
425fn extract_handler_config(script_config: &NuScriptConfig) -> Result<HandlerConfig, Error> {
426    // Deserialize the handler script options using the new deserialize_options method
427    let script_options: HandlerScriptOptions = script_config.deserialize_options()?;
428
429    // Process resume_from into the proper enum
430    let resume_from = match script_options.resume_from.as_deref() {
431        Some("head") => ResumeFrom::Head,
432        Some("tail") => ResumeFrom::Tail,
433        Some(id_str) => ResumeFrom::After(Scru128Id::from_str(id_str).map_err(|_| -> Error {
434            format!("Invalid scru128 ID for resume_from: {id_str}").into()
435        })?),
436        None => ResumeFrom::default(), // Default if not specified in script
437    };
438
439    // Build and return the HandlerConfig
440    Ok(HandlerConfig {
441        resume_from,
442        pulse: script_options.pulse,
443        return_options: script_options.return_options,
444    })
445}