Skip to main content

xs/
api.rs

1use std::collections::HashMap;
2use std::error::Error;
3use std::str::FromStr;
4
5use chrono::{DateTime, Utc};
6use scru128::Scru128Id;
7
8use base64::Engine;
9
10use tokio::io::AsyncWriteExt;
11use tokio_stream::wrappers::ReceiverStream;
12use tokio_stream::StreamExt;
13use tokio_util::io::ReaderStream;
14
15use http_body_util::StreamBody;
16use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full};
17use hyper::body::Bytes;
18use hyper::header::ACCEPT;
19use hyper::server::conn::http1;
20use hyper::service::service_fn;
21use hyper::{Method, Request, Response, StatusCode};
22use hyper_util::rt::TokioIo;
23
24use crate::listener::Listener;
25use crate::nu;
26use crate::store::{self, FollowOption, Frame, ReadOptions, Store, TTL};
27
28type BoxError = Box<dyn std::error::Error + Send + Sync>;
29type HTTPResult = Result<Response<BoxBody<Bytes, BoxError>>, BoxError>;
30
31/// Serialize a frame to JSON, optionally including a timestamp extracted from the scru128 ID
32fn serialize_frame(frame: &Frame, with_timestamp: bool) -> String {
33    if with_timestamp {
34        let mut value = serde_json::to_value(frame).unwrap();
35        if let serde_json::Value::Object(ref mut map) = value {
36            let millis = frame.id.timestamp() as i64;
37            let dt: DateTime<Utc> = DateTime::from_timestamp_millis(millis)
38                .unwrap_or_else(|| DateTime::from_timestamp(0, 0).unwrap());
39            map.insert("timestamp".to_string(), serde_json::json!(dt.to_rfc3339()));
40        }
41        serde_json::to_string(&value).unwrap()
42    } else {
43        serde_json::to_string(frame).unwrap()
44    }
45}
46
47#[derive(Debug, PartialEq, Clone)]
48enum AcceptType {
49    Ndjson,
50    EventStream,
51}
52
53enum Routes {
54    StreamCat {
55        accept_type: AcceptType,
56        options: ReadOptions,
57        with_timestamp: bool,
58    },
59    StreamAppend {
60        topic: String,
61        ttl: Option<TTL>,
62        with_timestamp: bool,
63    },
64    LastGet {
65        topic: Option<String>,
66        last: usize,
67        follow: bool,
68        with_timestamp: bool,
69    },
70    StreamItemGet {
71        id: Scru128Id,
72        with_timestamp: bool,
73    },
74    StreamItemRemove(Scru128Id),
75    CasGet(ssri::Integrity),
76    CasPost,
77    Import,
78    Eval,
79    Version,
80    NotFound,
81    BadRequest(String),
82}
83
84/// Validates an Integrity object to ensure all its hashes are properly formatted
85fn validate_integrity(integrity: &ssri::Integrity) -> bool {
86    // Check if there are any hashes
87    if integrity.hashes.is_empty() {
88        return false;
89    }
90
91    // For each hash, check if it has a valid base64-encoded digest
92    for hash in &integrity.hashes {
93        // Check if digest is valid base64 using the modern API
94        if base64::engine::general_purpose::STANDARD
95            .decode(&hash.digest)
96            .is_err()
97        {
98            return false;
99        }
100    }
101
102    true
103}
104
105fn match_route(
106    method: &Method,
107    path: &str,
108    headers: &hyper::HeaderMap,
109    query: Option<&str>,
110) -> Routes {
111    let params: HashMap<String, String> =
112        url::form_urlencoded::parse(query.unwrap_or("").as_bytes())
113            .into_owned()
114            .collect();
115
116    match (method, path) {
117        (&Method::GET, "/version") => Routes::Version,
118
119        (&Method::GET, "/") => {
120            let accept_type = match headers.get(ACCEPT) {
121                Some(accept) if accept == "text/event-stream" => AcceptType::EventStream,
122                _ => AcceptType::Ndjson,
123            };
124
125            let options = ReadOptions::from_query(query);
126            let with_timestamp = params.contains_key("with-timestamp");
127
128            match options {
129                Ok(options) => Routes::StreamCat {
130                    accept_type,
131                    options,
132                    with_timestamp,
133                },
134                Err(e) => Routes::BadRequest(e.to_string()),
135            }
136        }
137
138        (&Method::GET, "/last") => {
139            let follow = params.contains_key("follow");
140            let last = params.get("last").and_then(|v| v.parse().ok()).unwrap_or(1);
141            let with_timestamp = params.contains_key("with-timestamp");
142            Routes::LastGet {
143                topic: None,
144                last,
145                follow,
146                with_timestamp,
147            }
148        }
149
150        (&Method::GET, p) if p.starts_with("/last/") => {
151            let topic = p.strip_prefix("/last/").unwrap().to_string();
152            let follow = params.contains_key("follow");
153            let last = params.get("last").and_then(|v| v.parse().ok()).unwrap_or(1);
154            let with_timestamp = params.contains_key("with-timestamp");
155            Routes::LastGet {
156                topic: Some(topic),
157                last,
158                follow,
159                with_timestamp,
160            }
161        }
162
163        (&Method::GET, p) if p.starts_with("/cas/") => {
164            if let Some(hash) = p.strip_prefix("/cas/") {
165                match ssri::Integrity::from_str(hash) {
166                    Ok(integrity) => {
167                        if validate_integrity(&integrity) {
168                            Routes::CasGet(integrity)
169                        } else {
170                            Routes::BadRequest(format!("Invalid CAS hash format: {hash}"))
171                        }
172                    }
173                    Err(e) => Routes::BadRequest(format!("Invalid CAS hash: {e}")),
174                }
175            } else {
176                Routes::NotFound
177            }
178        }
179
180        (&Method::POST, "/cas") => Routes::CasPost,
181        (&Method::POST, "/import") => Routes::Import,
182        (&Method::POST, "/eval") => Routes::Eval,
183
184        (&Method::GET, p) => match Scru128Id::from_str(p.trim_start_matches('/')) {
185            Ok(id) => {
186                let with_timestamp = params.contains_key("with-timestamp");
187                Routes::StreamItemGet { id, with_timestamp }
188            }
189            Err(e) => Routes::BadRequest(format!("Invalid frame ID: {e}")),
190        },
191
192        (&Method::DELETE, p) => match Scru128Id::from_str(p.trim_start_matches('/')) {
193            Ok(id) => Routes::StreamItemRemove(id),
194            Err(e) => Routes::BadRequest(format!("Invalid frame ID: {e}")),
195        },
196
197        (&Method::POST, path) if path.starts_with("/append/") => {
198            let topic = path.strip_prefix("/append/").unwrap().to_string();
199            let with_timestamp = params.contains_key("with-timestamp");
200
201            match TTL::from_query(query) {
202                Ok(ttl) => Routes::StreamAppend {
203                    topic,
204                    ttl: Some(ttl),
205                    with_timestamp,
206                },
207                Err(e) => Routes::BadRequest(e.to_string()),
208            }
209        }
210
211        _ => Routes::NotFound,
212    }
213}
214
215async fn handle(
216    mut store: Store,
217    _engine: nu::Engine, // TODO: potentially vestigial, will .process come back?
218    req: Request<hyper::body::Incoming>,
219) -> HTTPResult {
220    let method = req.method();
221    let path = req.uri().path();
222    let headers = req.headers().clone();
223    let query = req.uri().query();
224
225    let res = match match_route(method, path, &headers, query) {
226        Routes::Version => handle_version().await,
227
228        Routes::StreamCat {
229            accept_type,
230            options,
231            with_timestamp,
232        } => handle_stream_cat(&mut store, options, accept_type, with_timestamp).await,
233
234        Routes::StreamAppend {
235            topic,
236            ttl,
237            with_timestamp,
238        } => handle_stream_append(&mut store, req, topic, ttl, with_timestamp).await,
239
240        Routes::CasGet(hash) => {
241            let reader = store.cas_reader(hash).await?;
242            let stream = ReaderStream::new(reader);
243
244            let stream = stream.map(|frame| {
245                let frame = frame.unwrap();
246                Ok(hyper::body::Frame::data(frame))
247            });
248
249            let body = StreamBody::new(stream).boxed();
250            Ok(Response::new(body))
251        }
252
253        Routes::CasPost => handle_cas_post(&mut store, req.into_body()).await,
254
255        Routes::StreamItemGet { id, with_timestamp } => {
256            response_frame_or_404(store.get(&id), with_timestamp)
257        }
258
259        Routes::StreamItemRemove(id) => handle_stream_item_remove(&mut store, id).await,
260
261        Routes::LastGet {
262            topic,
263            last,
264            follow,
265            with_timestamp,
266        } => handle_last_get(&store, topic.as_deref(), last, follow, with_timestamp).await,
267
268        Routes::Import => handle_import(&mut store, req.into_body()).await,
269
270        Routes::Eval => handle_eval(&store, req.into_body()).await,
271
272        Routes::NotFound => response_404(),
273        Routes::BadRequest(msg) => response_400(msg),
274    };
275
276    res.or_else(|e| response_500(e.to_string()))
277}
278
279async fn handle_stream_cat(
280    store: &mut Store,
281    options: ReadOptions,
282    accept_type: AcceptType,
283    with_timestamp: bool,
284) -> HTTPResult {
285    let rx = store.read(options).await;
286    let stream = ReceiverStream::new(rx);
287
288    let accept_type_clone = accept_type.clone();
289    let stream = stream.map(move |frame| {
290        let bytes = match accept_type_clone {
291            AcceptType::Ndjson => {
292                let mut encoded = serialize_frame(&frame, with_timestamp).into_bytes();
293                encoded.push(b'\n');
294                encoded
295            }
296            AcceptType::EventStream => format!(
297                "id: {id}\ndata: {data}\n\n",
298                id = frame.id,
299                data = serialize_frame(&frame, with_timestamp)
300            )
301            .into_bytes(),
302        };
303        Ok(hyper::body::Frame::data(Bytes::from(bytes)))
304    });
305
306    let body = StreamBody::new(stream).boxed();
307
308    let content_type = match accept_type {
309        AcceptType::Ndjson => "application/x-ndjson",
310        AcceptType::EventStream => "text/event-stream",
311    };
312
313    Ok(Response::builder()
314        .status(StatusCode::OK)
315        .header("Content-Type", content_type)
316        .body(body)?)
317}
318
319async fn handle_stream_append(
320    store: &mut Store,
321    req: Request<hyper::body::Incoming>,
322    topic: String,
323    ttl: Option<TTL>,
324    with_timestamp: bool,
325) -> HTTPResult {
326    let (parts, mut body) = req.into_parts();
327
328    let hash = {
329        let mut writer = store.cas_writer().await?;
330        let mut bytes_written = 0;
331
332        while let Some(frame) = body.frame().await {
333            if let Ok(data) = frame?.into_data() {
334                writer.write_all(&data).await?;
335                bytes_written += data.len();
336            }
337        }
338
339        if bytes_written > 0 {
340            Some(writer.commit().await?)
341        } else {
342            None
343        }
344    };
345
346    let meta = match parts
347        .headers
348        .get("xs-meta")
349        .map(|x| x.to_str())
350        .transpose()
351        .unwrap()
352        .map(|s| {
353            // First decode the Base64-encoded string
354            base64::prelude::BASE64_STANDARD
355                .decode(s)
356                .map_err(|e| format!("xs-meta isn't valid Base64: {e}"))
357                .and_then(|decoded| {
358                    // Then parse the decoded bytes as UTF-8 string
359                    String::from_utf8(decoded)
360                        .map_err(|e| format!("xs-meta isn't valid UTF-8: {e}"))
361                        .and_then(|json_str| {
362                            // Finally parse the UTF-8 string as JSON
363                            serde_json::from_str(&json_str)
364                                .map_err(|e| format!("xs-meta isn't valid JSON: {e}"))
365                        })
366                })
367        })
368        .transpose()
369    {
370        Ok(meta) => meta,
371        Err(e) => return response_400(e.to_string()),
372    };
373
374    let frame = store.append(
375        Frame::builder(topic)
376            .maybe_hash(hash)
377            .maybe_meta(meta)
378            .maybe_ttl(ttl)
379            .build(),
380    )?;
381
382    Ok(Response::builder()
383        .status(StatusCode::OK)
384        .header("Content-Type", "application/json")
385        .body(full(serialize_frame(&frame, with_timestamp)))?)
386}
387
388async fn handle_cas_post(store: &mut Store, mut body: hyper::body::Incoming) -> HTTPResult {
389    let hash = {
390        let mut writer = store.cas_writer().await?;
391        let mut bytes_written = 0;
392
393        while let Some(frame) = body.frame().await {
394            if let Ok(data) = frame?.into_data() {
395                writer.write_all(&data).await?;
396                bytes_written += data.len();
397            }
398        }
399
400        if bytes_written == 0 {
401            return response_400("Empty body".to_string());
402        }
403
404        writer.commit().await?
405    };
406
407    Ok(Response::builder()
408        .status(StatusCode::OK)
409        .header("Content-Type", "text/plain")
410        .body(full(hash.to_string()))?)
411}
412
413async fn handle_version() -> HTTPResult {
414    let version = env!("CARGO_PKG_VERSION");
415    let version_info = serde_json::json!({ "version": version });
416    Ok(Response::builder()
417        .status(StatusCode::OK)
418        .header("Content-Type", "application/json")
419        .body(full(serde_json::to_string(&version_info).unwrap()))?)
420}
421
422pub async fn serve(
423    store: Store,
424    engine: nu::Engine,
425    expose: Option<String>,
426) -> Result<(), BoxError> {
427    let path = store.path.join("sock").to_string_lossy().to_string();
428    let listener = Listener::bind(&path).await?;
429
430    let mut listeners = vec![listener];
431    let mut expose_meta = None;
432
433    if let Some(expose) = expose {
434        let expose_listener = Listener::bind(&expose).await?;
435
436        // Check if this is an iroh listener and get the ticket
437        if let Some(ticket) = expose_listener.get_ticket() {
438            expose_meta = Some(serde_json::json!({"expose": format!("iroh://{}", ticket)}));
439        } else {
440            expose_meta = Some(serde_json::json!({"expose": expose}));
441        }
442
443        listeners.push(expose_listener);
444    }
445
446    if let Err(e) = store.append(Frame::builder("xs.start").maybe_meta(expose_meta).build()) {
447        tracing::error!("Failed to append xs.start frame: {}", e);
448    }
449
450    let mut tasks = Vec::new();
451    for listener in listeners {
452        let store = store.clone();
453        let engine = engine.clone();
454        let task = tokio::spawn(async move { listener_loop(listener, store, engine).await });
455        tasks.push(task);
456    }
457
458    // TODO: graceful shutdown and error handling
459    // Wait for all listener tasks to complete (or until the first error)
460    for task in tasks {
461        task.await??;
462    }
463
464    Ok(())
465}
466
467async fn listener_loop(
468    mut listener: Listener,
469    store: Store,
470    engine: nu::Engine,
471) -> Result<(), BoxError> {
472    loop {
473        let (stream, _) = listener.accept().await?;
474        let io = TokioIo::new(stream);
475        let store = store.clone();
476        let engine = engine.clone();
477        tokio::task::spawn(async move {
478            if let Err(err) = http1::Builder::new()
479                .serve_connection(
480                    io,
481                    service_fn(move |req| handle(store.clone(), engine.clone(), req)),
482                )
483                .await
484            {
485                // Match against the error kind to selectively ignore `NotConnected` errors
486                if let Some(std::io::ErrorKind::NotConnected) = err.source().and_then(|source| {
487                    source
488                        .downcast_ref::<std::io::Error>()
489                        .map(|io_err| io_err.kind())
490                }) {
491                    // ignore the NotConnected error, hyper's way of saying the client disconnected
492                } else {
493                    // todo, Handle or log other errors
494                    tracing::error!("TBD: {:?}", err);
495                }
496            }
497        });
498    }
499}
500
501fn response_frame_or_404(frame: Option<store::Frame>, with_timestamp: bool) -> HTTPResult {
502    if let Some(frame) = frame {
503        Ok(Response::builder()
504            .status(StatusCode::OK)
505            .header("Content-Type", "application/json")
506            .body(full(serialize_frame(&frame, with_timestamp)))?)
507    } else {
508        response_404()
509    }
510}
511
512async fn handle_stream_item_remove(store: &mut Store, id: Scru128Id) -> HTTPResult {
513    match store.remove(&id) {
514        Ok(()) => Ok(Response::builder()
515            .status(StatusCode::NO_CONTENT)
516            .body(empty())?),
517        Err(e) => {
518            tracing::error!("Failed to remove item {}: {:?}", id, e);
519
520            Ok(Response::builder()
521                .status(StatusCode::INTERNAL_SERVER_ERROR)
522                .body(full("internal-error"))?)
523        }
524    }
525}
526
527async fn handle_last_get(
528    store: &Store,
529    topic: Option<&str>,
530    last: usize,
531    follow: bool,
532    with_timestamp: bool,
533) -> HTTPResult {
534    if !follow {
535        let options = ReadOptions::builder()
536            .last(last)
537            .maybe_topic(topic.map(|t| t.to_string()))
538            .build();
539
540        let frames: Vec<Frame> = store.read_sync(options).collect();
541
542        if frames.is_empty() {
543            return response_404();
544        }
545
546        // Format based on request, not result count:
547        // - last == 1 (default): JSON object
548        // - last > 1: NDJSON (even if fewer frames returned)
549        if last == 1 {
550            return response_frame_or_404(Some(frames.into_iter().next().unwrap()), with_timestamp);
551        }
552
553        let mut body = Vec::new();
554        for frame in frames {
555            body.extend(serialize_frame(&frame, with_timestamp).into_bytes());
556            body.push(b'\n');
557        }
558
559        return Ok(Response::builder()
560            .status(StatusCode::OK)
561            .header("Content-Type", "application/x-ndjson")
562            .body(full(body))?);
563    }
564
565    // Follow mode: use ReadOptions::last to get historical + live frames
566    // This emits xs.threshold after historical frames
567    let rx = store
568        .read(
569            ReadOptions::builder()
570                .last(last)
571                .maybe_topic(topic.map(|t| t.to_string()))
572                .follow(FollowOption::On)
573                .build(),
574        )
575        .await;
576
577    let stream = tokio_stream::wrappers::ReceiverStream::new(rx).map(move |frame| {
578        let mut bytes = serialize_frame(&frame, with_timestamp).into_bytes();
579        bytes.push(b'\n');
580        Ok::<_, BoxError>(hyper::body::Frame::data(Bytes::from(bytes)))
581    });
582
583    Ok(Response::builder()
584        .status(StatusCode::OK)
585        .header("Content-Type", "application/x-ndjson")
586        .body(StreamBody::new(stream).boxed())?)
587}
588
589async fn handle_import(store: &mut Store, body: hyper::body::Incoming) -> HTTPResult {
590    let bytes = body.collect().await?.to_bytes();
591    let frame: Frame = match serde_json::from_slice(&bytes) {
592        Ok(frame) => frame,
593        Err(e) => return response_400(format!("Invalid frame JSON: {e}")),
594    };
595
596    store.insert_frame(&frame)?;
597
598    Ok(Response::builder()
599        .status(StatusCode::OK)
600        .header("Content-Type", "application/json")
601        .body(full(serde_json::to_string(&frame).unwrap()))?)
602}
603
604fn response_404() -> HTTPResult {
605    Ok(Response::builder()
606        .status(StatusCode::NOT_FOUND)
607        .body(empty())?)
608}
609
610fn response_400(message: String) -> HTTPResult {
611    let body = full(message);
612    Ok(Response::builder()
613        .status(StatusCode::BAD_REQUEST)
614        .body(body)?)
615}
616
617fn response_500(message: String) -> HTTPResult {
618    let body = full(message);
619    Ok(Response::builder()
620        .status(StatusCode::INTERNAL_SERVER_ERROR)
621        .body(body)?)
622}
623
624fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, BoxError> {
625    Full::new(chunk.into())
626        .map_err(|never| match never {})
627        .boxed()
628}
629
630fn empty() -> BoxBody<Bytes, BoxError> {
631    Empty::<Bytes>::new()
632        .map_err(|never| match never {})
633        .boxed()
634}
635
636async fn handle_eval(store: &Store, body: hyper::body::Incoming) -> HTTPResult {
637    // Read the script from the request body
638    let bytes = body.collect().await?.to_bytes();
639    let script =
640        String::from_utf8(bytes.to_vec()).map_err(|e| format!("Invalid UTF-8 in script: {e}"))?;
641
642    // Create nushell engine with store helper commands
643    let mut engine =
644        nu::Engine::new().map_err(|e| format!("Failed to create nushell engine: {e}"))?;
645
646    // Add core commands
647    nu::add_core_commands(&mut engine, store)
648        .map_err(|e| format!("Failed to add core commands to engine: {e}"))?;
649
650    // Add streaming commands
651    engine
652        .add_commands(vec![
653            Box::new(nu::commands::cat_stream_command::CatStreamCommand::new(
654                store.clone(),
655            )),
656            Box::new(nu::commands::last_stream_command::LastStreamCommand::new(
657                store.clone(),
658            )),
659            Box::new(nu::commands::append_command::AppendCommand::new(
660                store.clone(),
661                serde_json::Value::Null,
662            )),
663        ])
664        .map_err(|e| format!("Failed to add streaming commands to engine: {e}"))?;
665
666    // Execute the script
667    let result = engine
668        .eval(nu_protocol::PipelineData::empty(), script)
669        .map_err(|e| format!("Script evaluation failed:\n{e}"))?;
670
671    // Format output based on PipelineData type according to spec
672    match result {
673        nu_protocol::PipelineData::ByteStream(stream, ..) => {
674            // ByteStream → raw bytes with proper streaming using channel pattern
675            if let Some(mut reader) = stream.reader() {
676                use std::io::Read;
677
678                let (tx, rx) = tokio::sync::mpsc::channel(16);
679
680                // Spawn sync task to read from nushell Reader and send to channel
681                std::thread::spawn(move || {
682                    let mut buffer = [0u8; 8192];
683                    loop {
684                        match reader.read(&mut buffer) {
685                            Ok(0) => break, // EOF
686                            Ok(n) => {
687                                let chunk = Bytes::copy_from_slice(&buffer[..n]);
688                                if tx
689                                    .blocking_send(Ok(hyper::body::Frame::data(chunk)))
690                                    .is_err()
691                                {
692                                    break;
693                                }
694                            }
695                            Err(e) => {
696                                let _ = tx.blocking_send(Err(
697                                    Box::new(e) as Box<dyn std::error::Error + Send + Sync>
698                                ));
699                                break;
700                            }
701                        }
702                    }
703                });
704
705                let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
706                let body = StreamBody::new(stream).boxed();
707                Ok(Response::builder()
708                    .status(StatusCode::OK)
709                    .header("Content-Type", "application/octet-stream")
710                    .body(body)?)
711            } else {
712                // No reader available, return empty response
713                Ok(Response::builder()
714                    .status(StatusCode::OK)
715                    .header("Content-Type", "application/octet-stream")
716                    .body(empty())?)
717            }
718        }
719        nu_protocol::PipelineData::ListStream(stream, ..) => {
720            // ListStream → JSONL stream with proper streaming using channel pattern
721            let (tx, rx) = tokio::sync::mpsc::channel(16);
722
723            // Spawn sync task to iterate stream and send JSONL to channel
724            std::thread::spawn(move || {
725                for value in stream.into_iter() {
726                    let json = nu::value_to_json(&value);
727                    match serde_json::to_vec(&json) {
728                        Ok(mut json_bytes) => {
729                            json_bytes.push(b'\n'); // Add newline for JSONL
730                            let chunk = Bytes::from(json_bytes);
731                            if tx
732                                .blocking_send(Ok(hyper::body::Frame::data(chunk)))
733                                .is_err()
734                            {
735                                break;
736                            }
737                        }
738                        Err(e) => {
739                            let _ = tx.blocking_send(Err(
740                                Box::new(e) as Box<dyn std::error::Error + Send + Sync>
741                            ));
742                            break;
743                        }
744                    }
745                }
746            });
747
748            let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
749            let body = StreamBody::new(stream).boxed();
750            Ok(Response::builder()
751                .status(StatusCode::OK)
752                .header("Content-Type", "application/x-ndjson")
753                .body(body)?)
754        }
755        nu_protocol::PipelineData::Value(value, ..) => {
756            match &value {
757                nu_protocol::Value::String { .. }
758                | nu_protocol::Value::Int { .. }
759                | nu_protocol::Value::Float { .. }
760                | nu_protocol::Value::Bool { .. } => {
761                    // Single primitive value → raw text
762                    let text = match value {
763                        nu_protocol::Value::String { val, .. } => val.clone(),
764                        nu_protocol::Value::Int { val, .. } => val.to_string(),
765                        nu_protocol::Value::Float { val, .. } => val.to_string(),
766                        nu_protocol::Value::Bool { val, .. } => val.to_string(),
767                        _ => value.into_string().unwrap_or_else(|_| "".to_string()),
768                    };
769                    Ok(Response::builder()
770                        .status(StatusCode::OK)
771                        .header("Content-Type", "text/plain")
772                        .body(full(text))?)
773                }
774                _ => {
775                    // Single structured value → JSON
776                    let json = nu::value_to_json(&value);
777                    let json_string = serde_json::to_string(&json)
778                        .map_err(|e| format!("Failed to serialize JSON: {e}"))?;
779                    Ok(Response::builder()
780                        .status(StatusCode::OK)
781                        .header("Content-Type", "application/json")
782                        .body(full(json_string))?)
783                }
784            }
785        }
786        nu_protocol::PipelineData::Empty => {
787            // Empty → nothing
788            Ok(Response::builder()
789                .status(StatusCode::NO_CONTENT)
790                .body(empty())?)
791        }
792    }
793}
794
795#[cfg(test)]
796mod tests {
797    use super::*;
798
799    #[test]
800    fn test_match_route_last() {
801        let headers = hyper::HeaderMap::new();
802
803        // /last/topic without follow
804        assert!(matches!(
805            match_route(&Method::GET, "/last/test", &headers, None),
806            Routes::LastGet { topic: Some(t), last: 1, follow: false, .. } if t == "test"
807        ));
808
809        // /last/topic with follow
810        assert!(matches!(
811            match_route(&Method::GET, "/last/test", &headers, Some("follow=true")),
812            Routes::LastGet { topic: Some(t), last: 1, follow: true, .. } if t == "test"
813        ));
814
815        // /last without topic
816        assert!(matches!(
817            match_route(&Method::GET, "/last", &headers, None),
818            Routes::LastGet {
819                topic: None,
820                last: 1,
821                follow: false,
822                ..
823            }
824        ));
825
826        // /last with last=5
827        assert!(matches!(
828            match_route(&Method::GET, "/last", &headers, Some("last=5")),
829            Routes::LastGet {
830                topic: None,
831                last: 5,
832                follow: false,
833                ..
834            }
835        ));
836
837        // /last/topic with last=3 and follow
838        assert!(matches!(
839            match_route(&Method::GET, "/last/test", &headers, Some("last=3&follow=true")),
840            Routes::LastGet { topic: Some(t), last: 3, follow: true, .. } if t == "test"
841        ));
842    }
843
844    #[tokio::test]
845    async fn test_handle_last_get_ndjson_format() {
846        use crate::store::Store;
847        use http_body_util::BodyExt;
848
849        let temp_dir = tempfile::tempdir().unwrap();
850        let store = Store::new(temp_dir.path().to_path_buf()).unwrap();
851
852        // Add one frame
853        store
854            .append(crate::store::Frame::builder("test").build())
855            .unwrap();
856
857        // Request last=1 (default) - should return JSON object (no trailing newline)
858        let response = handle_last_get(&store, Some("test"), 1, false, false)
859            .await
860            .unwrap();
861        let body = response.into_body().collect().await.unwrap().to_bytes();
862        let body_str = String::from_utf8(body.to_vec()).unwrap();
863        assert!(
864            !body_str.ends_with('\n'),
865            "Response should be JSON (no trailing newline) when last=1, got: {:?}",
866            body_str
867        );
868
869        // Request last=3 but only 1 frame exists - should still return NDJSON
870        let response = handle_last_get(&store, Some("test"), 3, false, false)
871            .await
872            .unwrap();
873        let body = response.into_body().collect().await.unwrap().to_bytes();
874        let body_str = String::from_utf8(body.to_vec()).unwrap();
875        assert!(
876            body_str.ends_with('\n'),
877            "Response should be NDJSON (end with newline) when last > 1, got: {:?}",
878            body_str
879        );
880
881        // Verify full chain: match_route -> dispatch -> handle_last_get
882        let headers = hyper::HeaderMap::new();
883        let route = match_route(&Method::GET, "/last/test", &headers, Some("last=3"));
884        if let Routes::LastGet {
885            topic,
886            last,
887            follow,
888            with_timestamp,
889        } = route
890        {
891            assert_eq!(last, 3, "Route should parse last=3");
892            let response = handle_last_get(&store, topic.as_deref(), last, follow, with_timestamp)
893                .await
894                .unwrap();
895            let body = response.into_body().collect().await.unwrap().to_bytes();
896            let body_str = String::from_utf8(body.to_vec()).unwrap();
897            assert!(
898                body_str.ends_with('\n'),
899                "Full chain should return NDJSON when last=3, got: {:?}",
900                body_str
901            );
902        } else {
903            panic!("Expected Routes::LastGet");
904        }
905    }
906
907    #[tokio::test]
908    async fn test_handle_eval_logic() {
909        // Test the core nushell execution logic by testing the engine directly
910        use crate::nu::Engine;
911        use crate::store::Store;
912        use nu_protocol::PipelineData;
913
914        // Create a temporary store for testing
915        let temp_dir = tempfile::tempdir().unwrap();
916        let store = Store::new(temp_dir.path().to_path_buf()).unwrap();
917
918        // Create nushell engine with store helper commands
919        let mut engine = Engine::new().unwrap();
920
921        // Add core commands
922        crate::nu::add_core_commands(&mut engine, &store).unwrap();
923
924        // Add streaming commands
925        engine
926            .add_commands(vec![
927                Box::new(
928                    crate::nu::commands::cat_stream_command::CatStreamCommand::new(store.clone()),
929                ),
930                Box::new(
931                    crate::nu::commands::last_stream_command::LastStreamCommand::new(store.clone()),
932                ),
933                Box::new(crate::nu::commands::append_command::AppendCommand::new(
934                    store.clone(),
935                    serde_json::Value::Null,
936                )),
937            ])
938            .unwrap();
939
940        // Test simple string expression
941        let result = engine
942            .eval(PipelineData::empty(), r#""hello world""#.to_string())
943            .unwrap();
944
945        match result {
946            PipelineData::Value(value, ..) => {
947                let text = value.into_string().unwrap();
948                assert_eq!(text, "hello world");
949            }
950            _ => panic!("Expected Value, got {:?}", result),
951        }
952
953        // Test simple math expression - result should be an integer
954        let result = engine
955            .eval(PipelineData::empty(), "2 + 3".to_string())
956            .unwrap();
957
958        match result {
959            PipelineData::Value(nu_protocol::Value::Int { val, .. }, ..) => {
960                assert_eq!(val, 5);
961            }
962            _ => panic!("Expected Int Value, got {:?}", result),
963        }
964    }
965}