Skip to main content

xs/
api.rs

1use std::collections::HashMap;
2use std::error::Error;
3use std::str::FromStr;
4
5use scru128::Scru128Id;
6
7use base64::Engine;
8
9use tokio::io::AsyncWriteExt;
10use tokio_stream::wrappers::ReceiverStream;
11use tokio_stream::StreamExt;
12use tokio_util::io::ReaderStream;
13
14use http_body_util::StreamBody;
15use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full};
16use hyper::body::Bytes;
17use hyper::header::ACCEPT;
18use hyper::server::conn::http1;
19use hyper::service::service_fn;
20use hyper::{Method, Request, Response, StatusCode};
21use hyper_util::rt::TokioIo;
22
23use crate::listener::Listener;
24use crate::nu;
25use crate::store::{self, FollowOption, Frame, ReadOptions, Store, TTL};
26
27type BoxError = Box<dyn std::error::Error + Send + Sync>;
28type HTTPResult = Result<Response<BoxBody<Bytes, BoxError>>, BoxError>;
29
30#[derive(Debug, PartialEq, Clone)]
31enum AcceptType {
32    Ndjson,
33    EventStream,
34}
35
36enum Routes {
37    StreamCat {
38        accept_type: AcceptType,
39        options: ReadOptions,
40    },
41    StreamAppend {
42        topic: String,
43        ttl: Option<TTL>,
44    },
45    LastGet {
46        topic: Option<String>,
47        last: usize,
48        follow: bool,
49    },
50    StreamItemGet(Scru128Id),
51    StreamItemRemove(Scru128Id),
52    CasGet(ssri::Integrity),
53    CasPost,
54    Import,
55    Eval,
56    Version,
57    NotFound,
58    BadRequest(String),
59}
60
61/// Validates an Integrity object to ensure all its hashes are properly formatted
62fn validate_integrity(integrity: &ssri::Integrity) -> bool {
63    // Check if there are any hashes
64    if integrity.hashes.is_empty() {
65        return false;
66    }
67
68    // For each hash, check if it has a valid base64-encoded digest
69    for hash in &integrity.hashes {
70        // Check if digest is valid base64 using the modern API
71        if base64::engine::general_purpose::STANDARD
72            .decode(&hash.digest)
73            .is_err()
74        {
75            return false;
76        }
77    }
78
79    true
80}
81
82fn match_route(
83    method: &Method,
84    path: &str,
85    headers: &hyper::HeaderMap,
86    query: Option<&str>,
87) -> Routes {
88    let params: HashMap<String, String> =
89        url::form_urlencoded::parse(query.unwrap_or("").as_bytes())
90            .into_owned()
91            .collect();
92
93    match (method, path) {
94        (&Method::GET, "/version") => Routes::Version,
95
96        (&Method::GET, "/") => {
97            let accept_type = match headers.get(ACCEPT) {
98                Some(accept) if accept == "text/event-stream" => AcceptType::EventStream,
99                _ => AcceptType::Ndjson,
100            };
101
102            let options = ReadOptions::from_query(query);
103
104            match options {
105                Ok(options) => Routes::StreamCat {
106                    accept_type,
107                    options,
108                },
109                Err(e) => Routes::BadRequest(e.to_string()),
110            }
111        }
112
113        (&Method::GET, "/last") => {
114            let follow = params.contains_key("follow");
115            let last = params.get("last").and_then(|v| v.parse().ok()).unwrap_or(1);
116            Routes::LastGet {
117                topic: None,
118                last,
119                follow,
120            }
121        }
122
123        (&Method::GET, p) if p.starts_with("/last/") => {
124            let topic = p.strip_prefix("/last/").unwrap().to_string();
125            let follow = params.contains_key("follow");
126            let last = params.get("last").and_then(|v| v.parse().ok()).unwrap_or(1);
127            Routes::LastGet {
128                topic: Some(topic),
129                last,
130                follow,
131            }
132        }
133
134        (&Method::GET, p) if p.starts_with("/cas/") => {
135            if let Some(hash) = p.strip_prefix("/cas/") {
136                match ssri::Integrity::from_str(hash) {
137                    Ok(integrity) => {
138                        if validate_integrity(&integrity) {
139                            Routes::CasGet(integrity)
140                        } else {
141                            Routes::BadRequest(format!("Invalid CAS hash format: {hash}"))
142                        }
143                    }
144                    Err(e) => Routes::BadRequest(format!("Invalid CAS hash: {e}")),
145                }
146            } else {
147                Routes::NotFound
148            }
149        }
150
151        (&Method::POST, "/cas") => Routes::CasPost,
152        (&Method::POST, "/import") => Routes::Import,
153        (&Method::POST, "/eval") => Routes::Eval,
154
155        (&Method::GET, p) => match Scru128Id::from_str(p.trim_start_matches('/')) {
156            Ok(id) => Routes::StreamItemGet(id),
157            Err(e) => Routes::BadRequest(format!("Invalid frame ID: {e}")),
158        },
159
160        (&Method::DELETE, p) => match Scru128Id::from_str(p.trim_start_matches('/')) {
161            Ok(id) => Routes::StreamItemRemove(id),
162            Err(e) => Routes::BadRequest(format!("Invalid frame ID: {e}")),
163        },
164
165        (&Method::POST, path) if path.starts_with("/append/") => {
166            let topic = path.strip_prefix("/append/").unwrap().to_string();
167
168            match TTL::from_query(query) {
169                Ok(ttl) => Routes::StreamAppend {
170                    topic,
171                    ttl: Some(ttl),
172                },
173                Err(e) => Routes::BadRequest(e.to_string()),
174            }
175        }
176
177        _ => Routes::NotFound,
178    }
179}
180
181async fn handle(
182    mut store: Store,
183    _engine: nu::Engine, // TODO: potentially vestigial, will .process come back?
184    req: Request<hyper::body::Incoming>,
185) -> HTTPResult {
186    let method = req.method();
187    let path = req.uri().path();
188    let headers = req.headers().clone();
189    let query = req.uri().query();
190
191    let res = match match_route(method, path, &headers, query) {
192        Routes::Version => handle_version().await,
193
194        Routes::StreamCat {
195            accept_type,
196            options,
197        } => handle_stream_cat(&mut store, options, accept_type).await,
198
199        Routes::StreamAppend { topic, ttl } => {
200            handle_stream_append(&mut store, req, topic, ttl).await
201        }
202
203        Routes::CasGet(hash) => {
204            let reader = store.cas_reader(hash).await?;
205            let stream = ReaderStream::new(reader);
206
207            let stream = stream.map(|frame| {
208                let frame = frame.unwrap();
209                Ok(hyper::body::Frame::data(frame))
210            });
211
212            let body = StreamBody::new(stream).boxed();
213            Ok(Response::new(body))
214        }
215
216        Routes::CasPost => handle_cas_post(&mut store, req.into_body()).await,
217
218        Routes::StreamItemGet(id) => response_frame_or_404(store.get(&id)),
219
220        Routes::StreamItemRemove(id) => handle_stream_item_remove(&mut store, id).await,
221
222        Routes::LastGet {
223            topic,
224            last,
225            follow,
226        } => handle_last_get(&store, topic.as_deref(), last, follow).await,
227
228        Routes::Import => handle_import(&mut store, req.into_body()).await,
229
230        Routes::Eval => handle_eval(&store, req.into_body()).await,
231
232        Routes::NotFound => response_404(),
233        Routes::BadRequest(msg) => response_400(msg),
234    };
235
236    res.or_else(|e| response_500(e.to_string()))
237}
238
239async fn handle_stream_cat(
240    store: &mut Store,
241    options: ReadOptions,
242    accept_type: AcceptType,
243) -> HTTPResult {
244    let rx = store.read(options).await;
245    let stream = ReceiverStream::new(rx);
246
247    let accept_type_clone = accept_type.clone();
248    let stream = stream.map(move |frame| {
249        let bytes = match accept_type_clone {
250            AcceptType::Ndjson => {
251                let mut encoded = serde_json::to_vec(&frame).unwrap();
252                encoded.push(b'\n');
253                encoded
254            }
255            AcceptType::EventStream => format!(
256                "id: {id}\ndata: {data}\n\n",
257                id = frame.id,
258                data = serde_json::to_string(&frame).unwrap_or_default()
259            )
260            .into_bytes(),
261        };
262        Ok(hyper::body::Frame::data(Bytes::from(bytes)))
263    });
264
265    let body = StreamBody::new(stream).boxed();
266
267    let content_type = match accept_type {
268        AcceptType::Ndjson => "application/x-ndjson",
269        AcceptType::EventStream => "text/event-stream",
270    };
271
272    Ok(Response::builder()
273        .status(StatusCode::OK)
274        .header("Content-Type", content_type)
275        .body(body)?)
276}
277
278async fn handle_stream_append(
279    store: &mut Store,
280    req: Request<hyper::body::Incoming>,
281    topic: String,
282    ttl: Option<TTL>,
283) -> HTTPResult {
284    let (parts, mut body) = req.into_parts();
285
286    let hash = {
287        let mut writer = store.cas_writer().await?;
288        let mut bytes_written = 0;
289
290        while let Some(frame) = body.frame().await {
291            if let Ok(data) = frame?.into_data() {
292                writer.write_all(&data).await?;
293                bytes_written += data.len();
294            }
295        }
296
297        if bytes_written > 0 {
298            Some(writer.commit().await?)
299        } else {
300            None
301        }
302    };
303
304    let meta = match parts
305        .headers
306        .get("xs-meta")
307        .map(|x| x.to_str())
308        .transpose()
309        .unwrap()
310        .map(|s| {
311            // First decode the Base64-encoded string
312            base64::prelude::BASE64_STANDARD
313                .decode(s)
314                .map_err(|e| format!("xs-meta isn't valid Base64: {e}"))
315                .and_then(|decoded| {
316                    // Then parse the decoded bytes as UTF-8 string
317                    String::from_utf8(decoded)
318                        .map_err(|e| format!("xs-meta isn't valid UTF-8: {e}"))
319                        .and_then(|json_str| {
320                            // Finally parse the UTF-8 string as JSON
321                            serde_json::from_str(&json_str)
322                                .map_err(|e| format!("xs-meta isn't valid JSON: {e}"))
323                        })
324                })
325        })
326        .transpose()
327    {
328        Ok(meta) => meta,
329        Err(e) => return response_400(e.to_string()),
330    };
331
332    let frame = store.append(
333        Frame::builder(topic)
334            .maybe_hash(hash)
335            .maybe_meta(meta)
336            .maybe_ttl(ttl)
337            .build(),
338    )?;
339
340    Ok(Response::builder()
341        .status(StatusCode::OK)
342        .header("Content-Type", "application/json")
343        .body(full(serde_json::to_string(&frame).unwrap()))?)
344}
345
346async fn handle_cas_post(store: &mut Store, mut body: hyper::body::Incoming) -> HTTPResult {
347    let hash = {
348        let mut writer = store.cas_writer().await?;
349        let mut bytes_written = 0;
350
351        while let Some(frame) = body.frame().await {
352            if let Ok(data) = frame?.into_data() {
353                writer.write_all(&data).await?;
354                bytes_written += data.len();
355            }
356        }
357
358        if bytes_written == 0 {
359            return response_400("Empty body".to_string());
360        }
361
362        writer.commit().await?
363    };
364
365    Ok(Response::builder()
366        .status(StatusCode::OK)
367        .header("Content-Type", "text/plain")
368        .body(full(hash.to_string()))?)
369}
370
371async fn handle_version() -> HTTPResult {
372    let version = env!("CARGO_PKG_VERSION");
373    let version_info = serde_json::json!({ "version": version });
374    Ok(Response::builder()
375        .status(StatusCode::OK)
376        .header("Content-Type", "application/json")
377        .body(full(serde_json::to_string(&version_info).unwrap()))?)
378}
379
380pub async fn serve(
381    store: Store,
382    engine: nu::Engine,
383    expose: Option<String>,
384) -> Result<(), BoxError> {
385    let path = store.path.join("sock").to_string_lossy().to_string();
386    let listener = Listener::bind(&path).await?;
387
388    let mut listeners = vec![listener];
389    let mut expose_meta = None;
390
391    if let Some(expose) = expose {
392        let expose_listener = Listener::bind(&expose).await?;
393
394        // Check if this is an iroh listener and get the ticket
395        if let Some(ticket) = expose_listener.get_ticket() {
396            expose_meta = Some(serde_json::json!({"expose": format!("iroh://{}", ticket)}));
397        } else {
398            expose_meta = Some(serde_json::json!({"expose": expose}));
399        }
400
401        listeners.push(expose_listener);
402    }
403
404    if let Err(e) = store.append(Frame::builder("xs.start").maybe_meta(expose_meta).build()) {
405        tracing::error!("Failed to append xs.start frame: {}", e);
406    }
407
408    let mut tasks = Vec::new();
409    for listener in listeners {
410        let store = store.clone();
411        let engine = engine.clone();
412        let task = tokio::spawn(async move { listener_loop(listener, store, engine).await });
413        tasks.push(task);
414    }
415
416    // TODO: graceful shutdown and error handling
417    // Wait for all listener tasks to complete (or until the first error)
418    for task in tasks {
419        task.await??;
420    }
421
422    Ok(())
423}
424
425async fn listener_loop(
426    mut listener: Listener,
427    store: Store,
428    engine: nu::Engine,
429) -> Result<(), BoxError> {
430    loop {
431        let (stream, _) = listener.accept().await?;
432        let io = TokioIo::new(stream);
433        let store = store.clone();
434        let engine = engine.clone();
435        tokio::task::spawn(async move {
436            if let Err(err) = http1::Builder::new()
437                .serve_connection(
438                    io,
439                    service_fn(move |req| handle(store.clone(), engine.clone(), req)),
440                )
441                .await
442            {
443                // Match against the error kind to selectively ignore `NotConnected` errors
444                if let Some(std::io::ErrorKind::NotConnected) = err.source().and_then(|source| {
445                    source
446                        .downcast_ref::<std::io::Error>()
447                        .map(|io_err| io_err.kind())
448                }) {
449                    // ignore the NotConnected error, hyper's way of saying the client disconnected
450                } else {
451                    // todo, Handle or log other errors
452                    tracing::error!("TBD: {:?}", err);
453                }
454            }
455        });
456    }
457}
458
459fn response_frame_or_404(frame: Option<store::Frame>) -> HTTPResult {
460    if let Some(frame) = frame {
461        Ok(Response::builder()
462            .status(StatusCode::OK)
463            .header("Content-Type", "application/json")
464            .body(full(serde_json::to_string(&frame).unwrap()))?)
465    } else {
466        response_404()
467    }
468}
469
470async fn handle_stream_item_remove(store: &mut Store, id: Scru128Id) -> HTTPResult {
471    match store.remove(&id) {
472        Ok(()) => Ok(Response::builder()
473            .status(StatusCode::NO_CONTENT)
474            .body(empty())?),
475        Err(e) => {
476            tracing::error!("Failed to remove item {}: {:?}", id, e);
477
478            Ok(Response::builder()
479                .status(StatusCode::INTERNAL_SERVER_ERROR)
480                .body(full("internal-error"))?)
481        }
482    }
483}
484
485async fn handle_last_get(
486    store: &Store,
487    topic: Option<&str>,
488    last: usize,
489    follow: bool,
490) -> HTTPResult {
491    if !follow {
492        let options = ReadOptions::builder()
493            .last(last)
494            .maybe_topic(topic.map(|t| t.to_string()))
495            .build();
496
497        let frames: Vec<Frame> = store.read_sync(options).collect();
498
499        if frames.is_empty() {
500            return response_404();
501        }
502
503        // Format based on request, not result count:
504        // - last == 1 (default): JSON object
505        // - last > 1: NDJSON (even if fewer frames returned)
506        if last == 1 {
507            return response_frame_or_404(Some(frames.into_iter().next().unwrap()));
508        }
509
510        let mut body = Vec::new();
511        for frame in frames {
512            serde_json::to_writer(&mut body, &frame).unwrap();
513            body.push(b'\n');
514        }
515
516        return Ok(Response::builder()
517            .status(StatusCode::OK)
518            .header("Content-Type", "application/x-ndjson")
519            .body(full(body))?);
520    }
521
522    // Follow mode: use ReadOptions::last to get historical + live frames
523    // This emits xs.threshold after historical frames
524    let rx = store
525        .read(
526            ReadOptions::builder()
527                .last(last)
528                .maybe_topic(topic.map(|t| t.to_string()))
529                .follow(FollowOption::On)
530                .build(),
531        )
532        .await;
533
534    let stream = tokio_stream::wrappers::ReceiverStream::new(rx).map(|frame| {
535        let mut bytes = serde_json::to_vec(&frame).unwrap();
536        bytes.push(b'\n');
537        Ok::<_, BoxError>(hyper::body::Frame::data(Bytes::from(bytes)))
538    });
539
540    Ok(Response::builder()
541        .status(StatusCode::OK)
542        .header("Content-Type", "application/x-ndjson")
543        .body(StreamBody::new(stream).boxed())?)
544}
545
546async fn handle_import(store: &mut Store, body: hyper::body::Incoming) -> HTTPResult {
547    let bytes = body.collect().await?.to_bytes();
548    let frame: Frame = match serde_json::from_slice(&bytes) {
549        Ok(frame) => frame,
550        Err(e) => return response_400(format!("Invalid frame JSON: {e}")),
551    };
552
553    store.insert_frame(&frame)?;
554
555    Ok(Response::builder()
556        .status(StatusCode::OK)
557        .header("Content-Type", "application/json")
558        .body(full(serde_json::to_string(&frame).unwrap()))?)
559}
560
561fn response_404() -> HTTPResult {
562    Ok(Response::builder()
563        .status(StatusCode::NOT_FOUND)
564        .body(empty())?)
565}
566
567fn response_400(message: String) -> HTTPResult {
568    let body = full(message);
569    Ok(Response::builder()
570        .status(StatusCode::BAD_REQUEST)
571        .body(body)?)
572}
573
574fn response_500(message: String) -> HTTPResult {
575    let body = full(message);
576    Ok(Response::builder()
577        .status(StatusCode::INTERNAL_SERVER_ERROR)
578        .body(body)?)
579}
580
581fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, BoxError> {
582    Full::new(chunk.into())
583        .map_err(|never| match never {})
584        .boxed()
585}
586
587fn empty() -> BoxBody<Bytes, BoxError> {
588    Empty::<Bytes>::new()
589        .map_err(|never| match never {})
590        .boxed()
591}
592
593async fn handle_eval(store: &Store, body: hyper::body::Incoming) -> HTTPResult {
594    // Read the script from the request body
595    let bytes = body.collect().await?.to_bytes();
596    let script =
597        String::from_utf8(bytes.to_vec()).map_err(|e| format!("Invalid UTF-8 in script: {e}"))?;
598
599    // Create nushell engine with store helper commands
600    let mut engine =
601        nu::Engine::new().map_err(|e| format!("Failed to create nushell engine: {e}"))?;
602
603    // Add core commands
604    nu::add_core_commands(&mut engine, store)
605        .map_err(|e| format!("Failed to add core commands to engine: {e}"))?;
606
607    // Add streaming commands
608    engine
609        .add_commands(vec![
610            Box::new(nu::commands::cat_stream_command::CatStreamCommand::new(
611                store.clone(),
612            )),
613            Box::new(nu::commands::last_stream_command::LastStreamCommand::new(
614                store.clone(),
615            )),
616            Box::new(nu::commands::append_command::AppendCommand::new(
617                store.clone(),
618                serde_json::Value::Null,
619            )),
620        ])
621        .map_err(|e| format!("Failed to add streaming commands to engine: {e}"))?;
622
623    // Execute the script
624    let result = engine
625        .eval(nu_protocol::PipelineData::empty(), script)
626        .map_err(|e| format!("Script evaluation failed:\n{e}"))?;
627
628    // Format output based on PipelineData type according to spec
629    match result {
630        nu_protocol::PipelineData::ByteStream(stream, ..) => {
631            // ByteStream → raw bytes with proper streaming using channel pattern
632            if let Some(mut reader) = stream.reader() {
633                use std::io::Read;
634
635                let (tx, rx) = tokio::sync::mpsc::channel(16);
636
637                // Spawn sync task to read from nushell Reader and send to channel
638                std::thread::spawn(move || {
639                    let mut buffer = [0u8; 8192];
640                    loop {
641                        match reader.read(&mut buffer) {
642                            Ok(0) => break, // EOF
643                            Ok(n) => {
644                                let chunk = Bytes::copy_from_slice(&buffer[..n]);
645                                if tx
646                                    .blocking_send(Ok(hyper::body::Frame::data(chunk)))
647                                    .is_err()
648                                {
649                                    break;
650                                }
651                            }
652                            Err(e) => {
653                                let _ = tx.blocking_send(Err(
654                                    Box::new(e) as Box<dyn std::error::Error + Send + Sync>
655                                ));
656                                break;
657                            }
658                        }
659                    }
660                });
661
662                let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
663                let body = StreamBody::new(stream).boxed();
664                Ok(Response::builder()
665                    .status(StatusCode::OK)
666                    .header("Content-Type", "application/octet-stream")
667                    .body(body)?)
668            } else {
669                // No reader available, return empty response
670                Ok(Response::builder()
671                    .status(StatusCode::OK)
672                    .header("Content-Type", "application/octet-stream")
673                    .body(empty())?)
674            }
675        }
676        nu_protocol::PipelineData::ListStream(stream, ..) => {
677            // ListStream → JSONL stream with proper streaming using channel pattern
678            let (tx, rx) = tokio::sync::mpsc::channel(16);
679
680            // Spawn sync task to iterate stream and send JSONL to channel
681            std::thread::spawn(move || {
682                for value in stream.into_iter() {
683                    let json = nu::value_to_json(&value);
684                    match serde_json::to_vec(&json) {
685                        Ok(mut json_bytes) => {
686                            json_bytes.push(b'\n'); // Add newline for JSONL
687                            let chunk = Bytes::from(json_bytes);
688                            if tx
689                                .blocking_send(Ok(hyper::body::Frame::data(chunk)))
690                                .is_err()
691                            {
692                                break;
693                            }
694                        }
695                        Err(e) => {
696                            let _ = tx.blocking_send(Err(
697                                Box::new(e) as Box<dyn std::error::Error + Send + Sync>
698                            ));
699                            break;
700                        }
701                    }
702                }
703            });
704
705            let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
706            let body = StreamBody::new(stream).boxed();
707            Ok(Response::builder()
708                .status(StatusCode::OK)
709                .header("Content-Type", "application/x-ndjson")
710                .body(body)?)
711        }
712        nu_protocol::PipelineData::Value(value, ..) => {
713            match &value {
714                nu_protocol::Value::String { .. }
715                | nu_protocol::Value::Int { .. }
716                | nu_protocol::Value::Float { .. }
717                | nu_protocol::Value::Bool { .. } => {
718                    // Single primitive value → raw text
719                    let text = match value {
720                        nu_protocol::Value::String { val, .. } => val.clone(),
721                        nu_protocol::Value::Int { val, .. } => val.to_string(),
722                        nu_protocol::Value::Float { val, .. } => val.to_string(),
723                        nu_protocol::Value::Bool { val, .. } => val.to_string(),
724                        _ => value.into_string().unwrap_or_else(|_| "".to_string()),
725                    };
726                    Ok(Response::builder()
727                        .status(StatusCode::OK)
728                        .header("Content-Type", "text/plain")
729                        .body(full(text))?)
730                }
731                _ => {
732                    // Single structured value → JSON
733                    let json = nu::value_to_json(&value);
734                    let json_string = serde_json::to_string(&json)
735                        .map_err(|e| format!("Failed to serialize JSON: {e}"))?;
736                    Ok(Response::builder()
737                        .status(StatusCode::OK)
738                        .header("Content-Type", "application/json")
739                        .body(full(json_string))?)
740                }
741            }
742        }
743        nu_protocol::PipelineData::Empty => {
744            // Empty → nothing
745            Ok(Response::builder()
746                .status(StatusCode::NO_CONTENT)
747                .body(empty())?)
748        }
749    }
750}
751
752#[cfg(test)]
753mod tests {
754    use super::*;
755
756    #[test]
757    fn test_match_route_last() {
758        let headers = hyper::HeaderMap::new();
759
760        // /last/topic without follow
761        assert!(matches!(
762            match_route(&Method::GET, "/last/test", &headers, None),
763            Routes::LastGet { topic: Some(t), last: 1, follow: false } if t == "test"
764        ));
765
766        // /last/topic with follow
767        assert!(matches!(
768            match_route(&Method::GET, "/last/test", &headers, Some("follow=true")),
769            Routes::LastGet { topic: Some(t), last: 1, follow: true } if t == "test"
770        ));
771
772        // /last without topic
773        assert!(matches!(
774            match_route(&Method::GET, "/last", &headers, None),
775            Routes::LastGet {
776                topic: None,
777                last: 1,
778                follow: false
779            }
780        ));
781
782        // /last with last=5
783        assert!(matches!(
784            match_route(&Method::GET, "/last", &headers, Some("last=5")),
785            Routes::LastGet {
786                topic: None,
787                last: 5,
788                follow: false
789            }
790        ));
791
792        // /last/topic with last=3 and follow
793        assert!(matches!(
794            match_route(&Method::GET, "/last/test", &headers, Some("last=3&follow=true")),
795            Routes::LastGet { topic: Some(t), last: 3, follow: true } if t == "test"
796        ));
797    }
798
799    #[tokio::test]
800    async fn test_handle_last_get_ndjson_format() {
801        use crate::store::Store;
802        use http_body_util::BodyExt;
803
804        let temp_dir = tempfile::tempdir().unwrap();
805        let store = Store::new(temp_dir.path().to_path_buf());
806
807        // Add one frame
808        store
809            .append(crate::store::Frame::builder("test").build())
810            .unwrap();
811
812        // Request last=1 (default) - should return JSON object (no trailing newline)
813        let response = handle_last_get(&store, Some("test"), 1, false)
814            .await
815            .unwrap();
816        let body = response.into_body().collect().await.unwrap().to_bytes();
817        let body_str = String::from_utf8(body.to_vec()).unwrap();
818        assert!(
819            !body_str.ends_with('\n'),
820            "Response should be JSON (no trailing newline) when last=1, got: {:?}",
821            body_str
822        );
823
824        // Request last=3 but only 1 frame exists - should still return NDJSON
825        let response = handle_last_get(&store, Some("test"), 3, false)
826            .await
827            .unwrap();
828        let body = response.into_body().collect().await.unwrap().to_bytes();
829        let body_str = String::from_utf8(body.to_vec()).unwrap();
830        assert!(
831            body_str.ends_with('\n'),
832            "Response should be NDJSON (end with newline) when last > 1, got: {:?}",
833            body_str
834        );
835
836        // Verify full chain: match_route -> dispatch -> handle_last_get
837        let headers = hyper::HeaderMap::new();
838        let route = match_route(&Method::GET, "/last/test", &headers, Some("last=3"));
839        if let Routes::LastGet {
840            topic,
841            last,
842            follow,
843        } = route
844        {
845            assert_eq!(last, 3, "Route should parse last=3");
846            let response = handle_last_get(&store, topic.as_deref(), last, follow)
847                .await
848                .unwrap();
849            let body = response.into_body().collect().await.unwrap().to_bytes();
850            let body_str = String::from_utf8(body.to_vec()).unwrap();
851            assert!(
852                body_str.ends_with('\n'),
853                "Full chain should return NDJSON when last=3, got: {:?}",
854                body_str
855            );
856        } else {
857            panic!("Expected Routes::LastGet");
858        }
859    }
860
861    #[tokio::test]
862    async fn test_handle_eval_logic() {
863        // Test the core nushell execution logic by testing the engine directly
864        use crate::nu::Engine;
865        use crate::store::Store;
866        use nu_protocol::PipelineData;
867
868        // Create a temporary store for testing
869        let temp_dir = tempfile::tempdir().unwrap();
870        let store = Store::new(temp_dir.path().to_path_buf());
871
872        // Create nushell engine with store helper commands
873        let mut engine = Engine::new().unwrap();
874
875        // Add core commands
876        crate::nu::add_core_commands(&mut engine, &store).unwrap();
877
878        // Add streaming commands
879        engine
880            .add_commands(vec![
881                Box::new(
882                    crate::nu::commands::cat_stream_command::CatStreamCommand::new(store.clone()),
883                ),
884                Box::new(
885                    crate::nu::commands::last_stream_command::LastStreamCommand::new(store.clone()),
886                ),
887                Box::new(crate::nu::commands::append_command::AppendCommand::new(
888                    store.clone(),
889                    serde_json::Value::Null,
890                )),
891            ])
892            .unwrap();
893
894        // Test simple string expression
895        let result = engine
896            .eval(PipelineData::empty(), r#""hello world""#.to_string())
897            .unwrap();
898
899        match result {
900            PipelineData::Value(value, ..) => {
901                let text = value.into_string().unwrap();
902                assert_eq!(text, "hello world");
903            }
904            _ => panic!("Expected Value, got {:?}", result),
905        }
906
907        // Test simple math expression - result should be an integer
908        let result = engine
909            .eval(PipelineData::empty(), "2 + 3".to_string())
910            .unwrap();
911
912        match result {
913            PipelineData::Value(nu_protocol::Value::Int { val, .. }, ..) => {
914                assert_eq!(val, 5);
915            }
916            _ => panic!("Expected Int Value, got {:?}", result),
917        }
918    }
919}