Skip to main content

xs/
api.rs

1use std::collections::HashMap;
2use std::error::Error;
3use std::str::FromStr;
4
5use chrono::{DateTime, Utc};
6use scru128::Scru128Id;
7
8use base64::Engine;
9
10use tokio::io::AsyncWriteExt;
11use tokio_stream::wrappers::ReceiverStream;
12use tokio_stream::StreamExt;
13use tokio_util::io::ReaderStream;
14
15use http_body_util::StreamBody;
16use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full};
17use hyper::body::Bytes;
18use hyper::header::ACCEPT;
19use hyper::server::conn::http1;
20use hyper::service::service_fn;
21use hyper::{Method, Request, Response, StatusCode};
22use hyper_util::rt::TokioIo;
23
24use crate::listener::Listener;
25use crate::nu;
26use crate::store::{self, FollowOption, Frame, ReadOptions, Store, TTL};
27
28type BoxError = Box<dyn std::error::Error + Send + Sync>;
29type HTTPResult = Result<Response<BoxBody<Bytes, BoxError>>, BoxError>;
30
31/// Serialize a frame to JSON, optionally including a timestamp extracted from the scru128 ID
32fn serialize_frame(frame: &Frame, with_timestamp: bool) -> String {
33    if with_timestamp {
34        let mut value = serde_json::to_value(frame).unwrap();
35        if let serde_json::Value::Object(ref mut map) = value {
36            let millis = frame.id.timestamp() as i64;
37            let dt: DateTime<Utc> = DateTime::from_timestamp_millis(millis)
38                .unwrap_or_else(|| DateTime::from_timestamp(0, 0).unwrap());
39            map.insert("timestamp".to_string(), serde_json::json!(dt.to_rfc3339()));
40        }
41        serde_json::to_string(&value).unwrap()
42    } else {
43        serde_json::to_string(frame).unwrap()
44    }
45}
46
47#[derive(Debug, PartialEq, Clone)]
48enum AcceptType {
49    Ndjson,
50    EventStream,
51}
52
53enum Routes {
54    StreamCat {
55        accept_type: AcceptType,
56        options: ReadOptions,
57        with_timestamp: bool,
58    },
59    StreamAppend {
60        topic: String,
61        ttl: Option<TTL>,
62        with_timestamp: bool,
63    },
64    LastGet {
65        topic: Option<String>,
66        last: usize,
67        follow: bool,
68        with_timestamp: bool,
69    },
70    StreamItemGet {
71        id: Scru128Id,
72        with_timestamp: bool,
73    },
74    StreamItemRemove(Scru128Id),
75    CasGet(ssri::Integrity),
76    CasPost,
77    Import,
78    Eval,
79    Version,
80    NotFound,
81    BadRequest(String),
82}
83
84/// Validates an Integrity object to ensure all its hashes are properly formatted
85fn validate_integrity(integrity: &ssri::Integrity) -> bool {
86    // Check if there are any hashes
87    if integrity.hashes.is_empty() {
88        return false;
89    }
90
91    // For each hash, check if it has a valid base64-encoded digest
92    for hash in &integrity.hashes {
93        // Check if digest is valid base64 using the modern API
94        if base64::engine::general_purpose::STANDARD
95            .decode(&hash.digest)
96            .is_err()
97        {
98            return false;
99        }
100    }
101
102    true
103}
104
105fn match_route(
106    method: &Method,
107    path: &str,
108    headers: &hyper::HeaderMap,
109    query: Option<&str>,
110) -> Routes {
111    let params: HashMap<String, String> =
112        url::form_urlencoded::parse(query.unwrap_or("").as_bytes())
113            .into_owned()
114            .collect();
115
116    match (method, path) {
117        (&Method::GET, "/version") => Routes::Version,
118
119        (&Method::GET, "/") => {
120            let accept_type = match headers.get(ACCEPT) {
121                Some(accept) if accept == "text/event-stream" => AcceptType::EventStream,
122                _ => AcceptType::Ndjson,
123            };
124
125            let options = ReadOptions::from_query(query);
126            let with_timestamp = params.contains_key("with-timestamp");
127
128            match options {
129                Ok(options) => Routes::StreamCat {
130                    accept_type,
131                    options,
132                    with_timestamp,
133                },
134                Err(e) => Routes::BadRequest(e.to_string()),
135            }
136        }
137
138        (&Method::GET, "/last") => {
139            let follow = params.contains_key("follow");
140            let last = params.get("last").and_then(|v| v.parse().ok()).unwrap_or(1);
141            let with_timestamp = params.contains_key("with-timestamp");
142            Routes::LastGet {
143                topic: None,
144                last,
145                follow,
146                with_timestamp,
147            }
148        }
149
150        (&Method::GET, p) if p.starts_with("/last/") => {
151            let topic = p.strip_prefix("/last/").unwrap().to_string();
152            let follow = params.contains_key("follow");
153            let last = params.get("last").and_then(|v| v.parse().ok()).unwrap_or(1);
154            let with_timestamp = params.contains_key("with-timestamp");
155            Routes::LastGet {
156                topic: Some(topic),
157                last,
158                follow,
159                with_timestamp,
160            }
161        }
162
163        (&Method::GET, p) if p.starts_with("/cas/") => {
164            if let Some(hash) = p.strip_prefix("/cas/") {
165                match ssri::Integrity::from_str(hash) {
166                    Ok(integrity) => {
167                        if validate_integrity(&integrity) {
168                            Routes::CasGet(integrity)
169                        } else {
170                            Routes::BadRequest(format!("Invalid CAS hash format: {hash}"))
171                        }
172                    }
173                    Err(e) => Routes::BadRequest(format!("Invalid CAS hash: {e}")),
174                }
175            } else {
176                Routes::NotFound
177            }
178        }
179
180        (&Method::POST, "/cas") => Routes::CasPost,
181        (&Method::POST, "/import") => Routes::Import,
182        (&Method::POST, "/eval") => Routes::Eval,
183
184        (&Method::GET, p) => match Scru128Id::from_str(p.trim_start_matches('/')) {
185            Ok(id) => {
186                let with_timestamp = params.contains_key("with-timestamp");
187                Routes::StreamItemGet { id, with_timestamp }
188            }
189            Err(e) => Routes::BadRequest(format!("Invalid frame ID: {e}")),
190        },
191
192        (&Method::DELETE, p) => match Scru128Id::from_str(p.trim_start_matches('/')) {
193            Ok(id) => Routes::StreamItemRemove(id),
194            Err(e) => Routes::BadRequest(format!("Invalid frame ID: {e}")),
195        },
196
197        (&Method::POST, path) if path.starts_with("/append/") => {
198            let topic = path.strip_prefix("/append/").unwrap().to_string();
199            let with_timestamp = params.contains_key("with-timestamp");
200
201            match TTL::from_query(query) {
202                Ok(ttl) => Routes::StreamAppend {
203                    topic,
204                    ttl: Some(ttl),
205                    with_timestamp,
206                },
207                Err(e) => Routes::BadRequest(e.to_string()),
208            }
209        }
210
211        _ => Routes::NotFound,
212    }
213}
214
215async fn handle(
216    mut store: Store,
217    _engine: nu::Engine, // TODO: potentially vestigial, will .process come back?
218    req: Request<hyper::body::Incoming>,
219) -> HTTPResult {
220    let method = req.method();
221    let path = req.uri().path();
222    let headers = req.headers().clone();
223    let query = req.uri().query();
224
225    let res = match match_route(method, path, &headers, query) {
226        Routes::Version => handle_version().await,
227
228        Routes::StreamCat {
229            accept_type,
230            options,
231            with_timestamp,
232        } => handle_stream_cat(&mut store, options, accept_type, with_timestamp).await,
233
234        Routes::StreamAppend {
235            topic,
236            ttl,
237            with_timestamp,
238        } => handle_stream_append(&mut store, req, topic, ttl, with_timestamp).await,
239
240        Routes::CasGet(hash) => {
241            let reader = store.cas_reader(hash).await?;
242            let stream = ReaderStream::new(reader);
243
244            let stream = stream.map(|frame| {
245                let frame = frame.unwrap();
246                Ok(hyper::body::Frame::data(frame))
247            });
248
249            let body = StreamBody::new(stream).boxed();
250            Ok(Response::new(body))
251        }
252
253        Routes::CasPost => handle_cas_post(&mut store, req.into_body()).await,
254
255        Routes::StreamItemGet { id, with_timestamp } => {
256            response_frame_or_404(store.get(&id), with_timestamp)
257        }
258
259        Routes::StreamItemRemove(id) => handle_stream_item_remove(&mut store, id).await,
260
261        Routes::LastGet {
262            topic,
263            last,
264            follow,
265            with_timestamp,
266        } => handle_last_get(&store, topic.as_deref(), last, follow, with_timestamp).await,
267
268        Routes::Import => handle_import(&mut store, req.into_body()).await,
269
270        Routes::Eval => handle_eval(&store, req.into_body()).await,
271
272        Routes::NotFound => response_404(),
273        Routes::BadRequest(msg) => response_400(msg),
274    };
275
276    res.or_else(|e| response_500(e.to_string()))
277}
278
279async fn handle_stream_cat(
280    store: &mut Store,
281    options: ReadOptions,
282    accept_type: AcceptType,
283    with_timestamp: bool,
284) -> HTTPResult {
285    let rx = store.read(options).await;
286    let stream = ReceiverStream::new(rx);
287
288    let accept_type_clone = accept_type.clone();
289    let stream = stream.map(move |frame| {
290        let bytes = match accept_type_clone {
291            AcceptType::Ndjson => {
292                let mut encoded = serialize_frame(&frame, with_timestamp).into_bytes();
293                encoded.push(b'\n');
294                encoded
295            }
296            AcceptType::EventStream => format!(
297                "id: {id}\ndata: {data}\n\n",
298                id = frame.id,
299                data = serialize_frame(&frame, with_timestamp)
300            )
301            .into_bytes(),
302        };
303        Ok(hyper::body::Frame::data(Bytes::from(bytes)))
304    });
305
306    let body = StreamBody::new(stream).boxed();
307
308    let content_type = match accept_type {
309        AcceptType::Ndjson => "application/x-ndjson",
310        AcceptType::EventStream => "text/event-stream",
311    };
312
313    Ok(Response::builder()
314        .status(StatusCode::OK)
315        .header("Content-Type", content_type)
316        .body(body)?)
317}
318
319async fn handle_stream_append(
320    store: &mut Store,
321    req: Request<hyper::body::Incoming>,
322    topic: String,
323    ttl: Option<TTL>,
324    with_timestamp: bool,
325) -> HTTPResult {
326    let (parts, mut body) = req.into_parts();
327
328    let hash = {
329        let mut writer = store.cas_writer().await?;
330        let mut bytes_written = 0;
331
332        while let Some(frame) = body.frame().await {
333            if let Ok(data) = frame?.into_data() {
334                writer.write_all(&data).await?;
335                bytes_written += data.len();
336            }
337        }
338
339        if bytes_written > 0 {
340            Some(writer.commit().await?)
341        } else {
342            None
343        }
344    };
345
346    let meta = match parts
347        .headers
348        .get("xs-meta")
349        .map(|x| x.to_str())
350        .transpose()
351        .unwrap()
352        .map(|s| {
353            // First decode the Base64-encoded string
354            base64::prelude::BASE64_STANDARD
355                .decode(s)
356                .map_err(|e| format!("xs-meta isn't valid Base64: {e}"))
357                .and_then(|decoded| {
358                    // Then parse the decoded bytes as UTF-8 string
359                    String::from_utf8(decoded)
360                        .map_err(|e| format!("xs-meta isn't valid UTF-8: {e}"))
361                        .and_then(|json_str| {
362                            // Finally parse the UTF-8 string as JSON
363                            serde_json::from_str(&json_str)
364                                .map_err(|e| format!("xs-meta isn't valid JSON: {e}"))
365                        })
366                })
367        })
368        .transpose()
369    {
370        Ok(meta) => meta,
371        Err(e) => return response_400(e.to_string()),
372    };
373
374    let frame = store.append(
375        Frame::builder(topic)
376            .maybe_hash(hash)
377            .maybe_meta(meta)
378            .maybe_ttl(ttl)
379            .build(),
380    )?;
381
382    Ok(Response::builder()
383        .status(StatusCode::OK)
384        .header("Content-Type", "application/json")
385        .body(full(serialize_frame(&frame, with_timestamp)))?)
386}
387
388async fn handle_cas_post(store: &mut Store, mut body: hyper::body::Incoming) -> HTTPResult {
389    let hash = {
390        let mut writer = store.cas_writer().await?;
391        let mut bytes_written = 0;
392
393        while let Some(frame) = body.frame().await {
394            if let Ok(data) = frame?.into_data() {
395                writer.write_all(&data).await?;
396                bytes_written += data.len();
397            }
398        }
399
400        if bytes_written == 0 {
401            return response_400("Empty body".to_string());
402        }
403
404        writer.commit().await?
405    };
406
407    Ok(Response::builder()
408        .status(StatusCode::OK)
409        .header("Content-Type", "text/plain")
410        .body(full(hash.to_string()))?)
411}
412
413async fn handle_version() -> HTTPResult {
414    let version = env!("CARGO_PKG_VERSION");
415    let version_info = serde_json::json!({ "version": version });
416    Ok(Response::builder()
417        .status(StatusCode::OK)
418        .header("Content-Type", "application/json")
419        .body(full(serde_json::to_string(&version_info).unwrap()))?)
420}
421
422pub async fn serve(
423    store: Store,
424    engine: nu::Engine,
425    expose: Option<String>,
426) -> Result<(), BoxError> {
427    let path = store.path.join("sock").to_string_lossy().to_string();
428    let listener = Listener::bind(&path).await?;
429
430    let mut listeners = vec![listener];
431    let mut expose_meta = None;
432
433    if let Some(expose) = expose {
434        let expose_listener = Listener::bind(&expose).await?;
435
436        // Check if this is an iroh listener and get the ticket
437        if let Some(ticket) = expose_listener.get_ticket() {
438            expose_meta = Some(serde_json::json!({"expose": format!("iroh://{}", ticket)}));
439        } else {
440            expose_meta = Some(serde_json::json!({"expose": expose}));
441        }
442
443        listeners.push(expose_listener);
444    }
445
446    if let Err(e) = store.append(Frame::builder("xs.start").maybe_meta(expose_meta).build()) {
447        tracing::error!("Failed to append xs.start frame: {}", e);
448    }
449
450    let mut tasks = Vec::new();
451    for listener in listeners {
452        let store = store.clone();
453        let engine = engine.clone();
454        let task = tokio::spawn(async move { listener_loop(listener, store, engine).await });
455        tasks.push(task);
456    }
457
458    // TODO: graceful shutdown and error handling
459    // Wait for all listener tasks to complete (or until the first error)
460    for task in tasks {
461        task.await??;
462    }
463
464    Ok(())
465}
466
467async fn listener_loop(
468    mut listener: Listener,
469    store: Store,
470    engine: nu::Engine,
471) -> Result<(), BoxError> {
472    loop {
473        let (stream, _) = listener.accept().await?;
474        let io = TokioIo::new(stream);
475        let store = store.clone();
476        let engine = engine.clone();
477        tokio::task::spawn(async move {
478            if let Err(err) = http1::Builder::new()
479                .serve_connection(
480                    io,
481                    service_fn(move |req| handle(store.clone(), engine.clone(), req)),
482                )
483                .await
484            {
485                // Match against the error kind to selectively ignore `NotConnected` errors
486                if let Some(std::io::ErrorKind::NotConnected) = err.source().and_then(|source| {
487                    source
488                        .downcast_ref::<std::io::Error>()
489                        .map(|io_err| io_err.kind())
490                }) {
491                    // ignore the NotConnected error, hyper's way of saying the client disconnected
492                } else {
493                    // todo, Handle or log other errors
494                    tracing::error!("TBD: {:?}", err);
495                }
496            }
497        });
498    }
499}
500
501fn response_frame_or_404(frame: Option<store::Frame>, with_timestamp: bool) -> HTTPResult {
502    if let Some(frame) = frame {
503        Ok(Response::builder()
504            .status(StatusCode::OK)
505            .header("Content-Type", "application/json")
506            .body(full(serialize_frame(&frame, with_timestamp)))?)
507    } else {
508        response_404()
509    }
510}
511
512async fn handle_stream_item_remove(store: &mut Store, id: Scru128Id) -> HTTPResult {
513    match store.remove(&id) {
514        Ok(()) => Ok(Response::builder()
515            .status(StatusCode::NO_CONTENT)
516            .body(empty())?),
517        Err(e) => {
518            tracing::error!("Failed to remove item {}: {:?}", id, e);
519
520            Ok(Response::builder()
521                .status(StatusCode::INTERNAL_SERVER_ERROR)
522                .body(full("internal-error"))?)
523        }
524    }
525}
526
527async fn handle_last_get(
528    store: &Store,
529    topic: Option<&str>,
530    last: usize,
531    follow: bool,
532    with_timestamp: bool,
533) -> HTTPResult {
534    if !follow {
535        let options = ReadOptions::builder()
536            .last(last)
537            .maybe_topic(topic.map(|t| t.to_string()))
538            .build();
539
540        let frames: Vec<Frame> = store.read_sync(options).collect();
541
542        if frames.is_empty() {
543            return response_404();
544        }
545
546        // Format based on request, not result count:
547        // - last == 1 (default): JSON object
548        // - last > 1: NDJSON (even if fewer frames returned)
549        if last == 1 {
550            return response_frame_or_404(Some(frames.into_iter().next().unwrap()), with_timestamp);
551        }
552
553        let mut body = Vec::new();
554        for frame in frames {
555            body.extend(serialize_frame(&frame, with_timestamp).into_bytes());
556            body.push(b'\n');
557        }
558
559        return Ok(Response::builder()
560            .status(StatusCode::OK)
561            .header("Content-Type", "application/x-ndjson")
562            .body(full(body))?);
563    }
564
565    // Follow mode: use ReadOptions::last to get historical + live frames
566    // This emits xs.threshold after historical frames
567    let rx = store
568        .read(
569            ReadOptions::builder()
570                .last(last)
571                .maybe_topic(topic.map(|t| t.to_string()))
572                .follow(FollowOption::On)
573                .build(),
574        )
575        .await;
576
577    let stream = tokio_stream::wrappers::ReceiverStream::new(rx).map(move |frame| {
578        let mut bytes = serialize_frame(&frame, with_timestamp).into_bytes();
579        bytes.push(b'\n');
580        Ok::<_, BoxError>(hyper::body::Frame::data(Bytes::from(bytes)))
581    });
582
583    Ok(Response::builder()
584        .status(StatusCode::OK)
585        .header("Content-Type", "application/x-ndjson")
586        .body(StreamBody::new(stream).boxed())?)
587}
588
589async fn handle_import(store: &mut Store, body: hyper::body::Incoming) -> HTTPResult {
590    let bytes = body.collect().await?.to_bytes();
591    let frame: Frame = match serde_json::from_slice(&bytes) {
592        Ok(frame) => frame,
593        Err(e) => return response_400(format!("Invalid frame JSON: {e}")),
594    };
595
596    store.insert_frame(&frame)?;
597
598    Ok(Response::builder()
599        .status(StatusCode::OK)
600        .header("Content-Type", "application/json")
601        .body(full(serde_json::to_string(&frame).unwrap()))?)
602}
603
604fn response_404() -> HTTPResult {
605    Ok(Response::builder()
606        .status(StatusCode::NOT_FOUND)
607        .body(empty())?)
608}
609
610fn response_400(message: String) -> HTTPResult {
611    let body = full(message);
612    Ok(Response::builder()
613        .status(StatusCode::BAD_REQUEST)
614        .body(body)?)
615}
616
617fn response_500(message: String) -> HTTPResult {
618    let body = full(message);
619    Ok(Response::builder()
620        .status(StatusCode::INTERNAL_SERVER_ERROR)
621        .body(body)?)
622}
623
624fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, BoxError> {
625    Full::new(chunk.into())
626        .map_err(|never| match never {})
627        .boxed()
628}
629
630fn empty() -> BoxBody<Bytes, BoxError> {
631    Empty::<Bytes>::new()
632        .map_err(|never| match never {})
633        .boxed()
634}
635
636/// Engine for an ad-hoc `.eval` script: the prepared base (Stream reads +
637/// Direct `.append`) plus the VFS modules registered so far, so eval scripts
638/// can `use` them, the same builtins the runners get.
639fn eval_engine(store: &Store) -> Result<nu::Engine, String> {
640    let mut engine = nu::prepared_base(store, nu::ReadMode::Stream, true)
641        .map_err(|e| format!("Failed to build nushell engine: {e}"))?;
642    // Modules registered up to now: a fresh time-ordered id exceeds every
643    // already-appended frame.
644    let modules = store.nu_modules_at(&scru128::new());
645    nu::load_modules(&mut engine.state, store, &modules)
646        .map_err(|e| format!("Failed to load modules: {e}"))?;
647    Ok(engine)
648}
649
650async fn handle_eval(store: &Store, body: hyper::body::Incoming) -> HTTPResult {
651    // Read the script from the request body
652    let bytes = body.collect().await?.to_bytes();
653    let script =
654        String::from_utf8(bytes.to_vec()).map_err(|e| format!("Invalid UTF-8 in script: {e}"))?;
655
656    let engine = eval_engine(store)?;
657
658    // Execute the script
659    let result = engine
660        .eval(nu_protocol::PipelineData::empty(), script)
661        .map_err(|e| format!("Script evaluation failed:\n{e}"))?;
662
663    // Format output based on PipelineData type according to spec
664    match result {
665        nu_protocol::PipelineData::ByteStream(stream, ..) => {
666            // ByteStream → raw bytes with proper streaming using channel pattern
667            if let Some(mut reader) = stream.reader() {
668                use std::io::Read;
669
670                let (tx, rx) = tokio::sync::mpsc::channel(16);
671
672                // Spawn sync task to read from nushell Reader and send to channel
673                std::thread::spawn(move || {
674                    let mut buffer = [0u8; 8192];
675                    loop {
676                        match reader.read(&mut buffer) {
677                            Ok(0) => break, // EOF
678                            Ok(n) => {
679                                let chunk = Bytes::copy_from_slice(&buffer[..n]);
680                                if tx
681                                    .blocking_send(Ok(hyper::body::Frame::data(chunk)))
682                                    .is_err()
683                                {
684                                    break;
685                                }
686                            }
687                            Err(e) => {
688                                let _ = tx.blocking_send(Err(
689                                    Box::new(e) as Box<dyn std::error::Error + Send + Sync>
690                                ));
691                                break;
692                            }
693                        }
694                    }
695                });
696
697                let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
698                let body = StreamBody::new(stream).boxed();
699                Ok(Response::builder()
700                    .status(StatusCode::OK)
701                    .header("Content-Type", "application/octet-stream")
702                    .body(body)?)
703            } else {
704                // No reader available, return empty response
705                Ok(Response::builder()
706                    .status(StatusCode::OK)
707                    .header("Content-Type", "application/octet-stream")
708                    .body(empty())?)
709            }
710        }
711        nu_protocol::PipelineData::ListStream(stream, ..) => {
712            // ListStream → JSONL stream with proper streaming using channel pattern
713            let (tx, rx) = tokio::sync::mpsc::channel(16);
714
715            // Spawn sync task to iterate stream and send JSONL to channel
716            std::thread::spawn(move || {
717                for value in stream.into_iter() {
718                    let json = nu::value_to_json(&value);
719                    match serde_json::to_vec(&json) {
720                        Ok(mut json_bytes) => {
721                            json_bytes.push(b'\n'); // Add newline for JSONL
722                            let chunk = Bytes::from(json_bytes);
723                            if tx
724                                .blocking_send(Ok(hyper::body::Frame::data(chunk)))
725                                .is_err()
726                            {
727                                break;
728                            }
729                        }
730                        Err(e) => {
731                            let _ = tx.blocking_send(Err(
732                                Box::new(e) as Box<dyn std::error::Error + Send + Sync>
733                            ));
734                            break;
735                        }
736                    }
737                }
738            });
739
740            let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
741            let body = StreamBody::new(stream).boxed();
742            Ok(Response::builder()
743                .status(StatusCode::OK)
744                .header("Content-Type", "application/x-ndjson")
745                .body(body)?)
746        }
747        nu_protocol::PipelineData::Value(value, ..) => {
748            match &value {
749                nu_protocol::Value::String { .. }
750                | nu_protocol::Value::Int { .. }
751                | nu_protocol::Value::Float { .. }
752                | nu_protocol::Value::Bool { .. } => {
753                    // Single primitive value → raw text
754                    let text = match value {
755                        nu_protocol::Value::String { val, .. } => val.clone(),
756                        nu_protocol::Value::Int { val, .. } => val.to_string(),
757                        nu_protocol::Value::Float { val, .. } => val.to_string(),
758                        nu_protocol::Value::Bool { val, .. } => val.to_string(),
759                        _ => value.into_string().unwrap_or_else(|_| "".to_string()),
760                    };
761                    Ok(Response::builder()
762                        .status(StatusCode::OK)
763                        .header("Content-Type", "text/plain")
764                        .body(full(text))?)
765                }
766                _ => {
767                    // Single structured value → JSON
768                    let json = nu::value_to_json(&value);
769                    let json_string = serde_json::to_string(&json)
770                        .map_err(|e| format!("Failed to serialize JSON: {e}"))?;
771                    Ok(Response::builder()
772                        .status(StatusCode::OK)
773                        .header("Content-Type", "application/json")
774                        .body(full(json_string))?)
775                }
776            }
777        }
778        nu_protocol::PipelineData::Empty => {
779            // Empty → nothing
780            Ok(Response::builder()
781                .status(StatusCode::NO_CONTENT)
782                .body(empty())?)
783        }
784    }
785}
786
787#[cfg(test)]
788mod tests {
789    use super::*;
790
791    #[test]
792    fn test_match_route_last() {
793        let headers = hyper::HeaderMap::new();
794
795        // /last/topic without follow
796        assert!(matches!(
797            match_route(&Method::GET, "/last/test", &headers, None),
798            Routes::LastGet { topic: Some(t), last: 1, follow: false, .. } if t == "test"
799        ));
800
801        // /last/topic with follow
802        assert!(matches!(
803            match_route(&Method::GET, "/last/test", &headers, Some("follow=true")),
804            Routes::LastGet { topic: Some(t), last: 1, follow: true, .. } if t == "test"
805        ));
806
807        // /last without topic
808        assert!(matches!(
809            match_route(&Method::GET, "/last", &headers, None),
810            Routes::LastGet {
811                topic: None,
812                last: 1,
813                follow: false,
814                ..
815            }
816        ));
817
818        // /last with last=5
819        assert!(matches!(
820            match_route(&Method::GET, "/last", &headers, Some("last=5")),
821            Routes::LastGet {
822                topic: None,
823                last: 5,
824                follow: false,
825                ..
826            }
827        ));
828
829        // /last/topic with last=3 and follow
830        assert!(matches!(
831            match_route(&Method::GET, "/last/test", &headers, Some("last=3&follow=true")),
832            Routes::LastGet { topic: Some(t), last: 3, follow: true, .. } if t == "test"
833        ));
834    }
835
836    #[tokio::test]
837    async fn test_handle_last_get_ndjson_format() {
838        use crate::store::Store;
839        use http_body_util::BodyExt;
840
841        let temp_dir = tempfile::tempdir().unwrap();
842        let store = Store::new(temp_dir.path().to_path_buf()).unwrap();
843
844        // Add one frame
845        store
846            .append(crate::store::Frame::builder("test").build())
847            .unwrap();
848
849        // Request last=1 (default) - should return JSON object (no trailing newline)
850        let response = handle_last_get(&store, Some("test"), 1, false, false)
851            .await
852            .unwrap();
853        let body = response.into_body().collect().await.unwrap().to_bytes();
854        let body_str = String::from_utf8(body.to_vec()).unwrap();
855        assert!(
856            !body_str.ends_with('\n'),
857            "Response should be JSON (no trailing newline) when last=1, got: {:?}",
858            body_str
859        );
860
861        // Request last=3 but only 1 frame exists - should still return NDJSON
862        let response = handle_last_get(&store, Some("test"), 3, false, false)
863            .await
864            .unwrap();
865        let body = response.into_body().collect().await.unwrap().to_bytes();
866        let body_str = String::from_utf8(body.to_vec()).unwrap();
867        assert!(
868            body_str.ends_with('\n'),
869            "Response should be NDJSON (end with newline) when last > 1, got: {:?}",
870            body_str
871        );
872
873        // Verify full chain: match_route -> dispatch -> handle_last_get
874        let headers = hyper::HeaderMap::new();
875        let route = match_route(&Method::GET, "/last/test", &headers, Some("last=3"));
876        if let Routes::LastGet {
877            topic,
878            last,
879            follow,
880            with_timestamp,
881        } = route
882        {
883            assert_eq!(last, 3, "Route should parse last=3");
884            let response = handle_last_get(&store, topic.as_deref(), last, follow, with_timestamp)
885                .await
886                .unwrap();
887            let body = response.into_body().collect().await.unwrap().to_bytes();
888            let body_str = String::from_utf8(body.to_vec()).unwrap();
889            assert!(
890                body_str.ends_with('\n'),
891                "Full chain should return NDJSON when last=3, got: {:?}",
892                body_str
893            );
894        } else {
895            panic!("Expected Routes::LastGet");
896        }
897    }
898
899    #[tokio::test]
900    async fn test_handle_eval_logic() {
901        // Test the core nushell execution logic by testing the engine directly
902        use crate::nu::Engine;
903        use crate::store::Store;
904        use nu_protocol::PipelineData;
905
906        // Create a temporary store for testing
907        let temp_dir = tempfile::tempdir().unwrap();
908        let store = Store::new(temp_dir.path().to_path_buf()).unwrap();
909
910        // Create nushell engine with store helper commands
911        let mut engine = Engine::new().unwrap();
912
913        // Add core commands
914        crate::nu::add_core_commands(&mut engine, &store).unwrap();
915
916        // Add streaming commands
917        engine
918            .add_commands(vec![
919                Box::new(
920                    crate::nu::commands::cat_stream_command::CatStreamCommand::new(store.clone()),
921                ),
922                Box::new(
923                    crate::nu::commands::last_stream_command::LastStreamCommand::new(store.clone()),
924                ),
925                Box::new(crate::nu::commands::append_command::AppendCommand::new(
926                    store.clone(),
927                )),
928            ])
929            .unwrap();
930
931        // Test simple string expression
932        let result = engine
933            .eval(PipelineData::empty(), r#""hello world""#.to_string())
934            .unwrap();
935
936        match result {
937            PipelineData::Value(value, ..) => {
938                let text = value.into_string().unwrap();
939                assert_eq!(text, "hello world");
940            }
941            _ => panic!("Expected Value, got {:?}", result),
942        }
943
944        // Test simple math expression - result should be an integer
945        let result = engine
946            .eval(PipelineData::empty(), "2 + 3".to_string())
947            .unwrap();
948
949        match result {
950            PipelineData::Value(nu_protocol::Value::Int { val, .. }, ..) => {
951                assert_eq!(val, 5);
952            }
953            _ => panic!("Expected Int Value, got {:?}", result),
954        }
955    }
956
957    // An `.eval` script must be able to `use` a registered VFS module.
958    #[tokio::test]
959    async fn test_eval_uses_modules() {
960        use crate::store::{Frame, Store};
961        use nu_protocol::PipelineData;
962
963        let temp_dir = tempfile::tempdir().unwrap();
964        let store = Store::new(temp_dir.path().to_path_buf()).unwrap();
965
966        let hash = store
967            .cas_insert_sync(r#"export def add_nums [x, y] { $"sum is ($x + $y)" }"#)
968            .unwrap();
969        store
970            .append(Frame::builder("xs.module.mymod").hash(hash).build())
971            .unwrap();
972
973        let engine = super::eval_engine(&store).unwrap();
974        let result = engine
975            .eval(
976                PipelineData::empty(),
977                "use mymod; mymod add_nums 40 2".to_string(),
978            )
979            .unwrap()
980            .into_value(nu_protocol::Span::test_data())
981            .unwrap();
982        assert_eq!(result.into_string().unwrap(), "sum is 42");
983    }
984
985    // A module's exported function must be able to call the store builtins (here
986    // `.append`), not just pure nushell. (rokf94's report; the engine carries
987    // the builtins before modules load.)
988    #[tokio::test]
989    async fn test_module_can_use_append_builtin() {
990        use crate::store::{Frame, Store};
991        use nu_protocol::{PipelineData, Span};
992
993        let temp_dir = tempfile::tempdir().unwrap();
994        let store = Store::new(temp_dir.path().to_path_buf()).unwrap();
995
996        let hash = store
997            .cas_insert_sync(r#"export def emit [] { {} | .append out --meta {via: "module"} }"#)
998            .unwrap();
999        store
1000            .append(Frame::builder("xs.module.emitter").hash(hash).build())
1001            .unwrap();
1002
1003        let engine = super::eval_engine(&store).unwrap();
1004        let value = engine
1005            .eval(
1006                PipelineData::empty(),
1007                "use emitter; emitter emit".to_string(),
1008            )
1009            .unwrap()
1010            .into_value(Span::test_data())
1011            .unwrap();
1012
1013        // `.append` ran inside the module and returned the appended frame.
1014        let json = crate::nu::value_to_json(&value);
1015        assert_eq!(json["topic"], "out");
1016        assert_eq!(json["meta"]["via"], "module");
1017    }
1018}