xs/
api.rs

1use std::collections::HashMap;
2use std::error::Error;
3use std::str::FromStr;
4
5use scru128::Scru128Id;
6
7use base64::Engine;
8
9use tokio::io::AsyncWriteExt;
10use tokio_stream::wrappers::ReceiverStream;
11use tokio_stream::StreamExt;
12use tokio_util::io::ReaderStream;
13
14use http_body_util::StreamBody;
15use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full};
16use hyper::body::Bytes;
17use hyper::header::ACCEPT;
18use hyper::server::conn::http1;
19use hyper::service::service_fn;
20use hyper::{Method, Request, Response, StatusCode};
21use hyper_util::rt::TokioIo;
22
23use crate::listener::Listener;
24use crate::nu;
25use crate::store::{self, FollowOption, Frame, ReadOptions, Store, TTL};
26
27type BoxError = Box<dyn std::error::Error + Send + Sync>;
28type HTTPResult = Result<Response<BoxBody<Bytes, BoxError>>, BoxError>;
29
30#[derive(Debug, PartialEq, Clone)]
31enum AcceptType {
32    Ndjson,
33    EventStream,
34}
35
36enum Routes {
37    StreamCat {
38        accept_type: AcceptType,
39        options: ReadOptions,
40    },
41    StreamAppend {
42        topic: String,
43        ttl: Option<TTL>,
44        context_id: Scru128Id,
45    },
46    HeadGet {
47        topic: String,
48        follow: bool,
49        context_id: Scru128Id,
50    },
51    StreamItemGet(Scru128Id),
52    StreamItemRemove(Scru128Id),
53    CasGet(ssri::Integrity),
54    CasPost,
55    Import,
56    Exec,
57    Version,
58    NotFound,
59    BadRequest(String),
60}
61
62/// Validates an Integrity object to ensure all its hashes are properly formatted
63fn validate_integrity(integrity: &ssri::Integrity) -> bool {
64    // Check if there are any hashes
65    if integrity.hashes.is_empty() {
66        return false;
67    }
68
69    // For each hash, check if it has a valid base64-encoded digest
70    for hash in &integrity.hashes {
71        // Check if digest is valid base64 using the modern API
72        if base64::engine::general_purpose::STANDARD
73            .decode(&hash.digest)
74            .is_err()
75        {
76            return false;
77        }
78    }
79
80    true
81}
82
83fn match_route(
84    method: &Method,
85    path: &str,
86    headers: &hyper::HeaderMap,
87    query: Option<&str>,
88) -> Routes {
89    let params: HashMap<String, String> =
90        url::form_urlencoded::parse(query.unwrap_or("").as_bytes())
91            .into_owned()
92            .collect();
93
94    match (method, path) {
95        (&Method::GET, "/version") => Routes::Version,
96
97        (&Method::GET, "/") => {
98            let accept_type = match headers.get(ACCEPT) {
99                Some(accept) if accept == "text/event-stream" => AcceptType::EventStream,
100                _ => AcceptType::Ndjson,
101            };
102
103            let options = ReadOptions::from_query(query);
104
105            match options {
106                Ok(options) => Routes::StreamCat {
107                    accept_type,
108                    options,
109                },
110                Err(e) => Routes::BadRequest(e.to_string()),
111            }
112        }
113
114        (&Method::GET, p) if p.starts_with("/head/") => {
115            let topic = p.strip_prefix("/head/").unwrap().to_string();
116            let follow = params.contains_key("follow");
117            let context_id = match params.get("context") {
118                None => crate::store::ZERO_CONTEXT,
119                Some(ctx) => match ctx.parse() {
120                    Ok(id) => id,
121                    Err(e) => return Routes::BadRequest(format!("Invalid context ID: {e}")),
122                },
123            };
124            Routes::HeadGet {
125                topic,
126                follow,
127                context_id,
128            }
129        }
130
131        (&Method::GET, p) if p.starts_with("/cas/") => {
132            if let Some(hash) = p.strip_prefix("/cas/") {
133                match ssri::Integrity::from_str(hash) {
134                    Ok(integrity) => {
135                        if validate_integrity(&integrity) {
136                            Routes::CasGet(integrity)
137                        } else {
138                            Routes::BadRequest(format!("Invalid CAS hash format: {hash}"))
139                        }
140                    }
141                    Err(e) => Routes::BadRequest(format!("Invalid CAS hash: {e}")),
142                }
143            } else {
144                Routes::NotFound
145            }
146        }
147
148        (&Method::POST, "/cas") => Routes::CasPost,
149        (&Method::POST, "/import") => Routes::Import,
150        (&Method::POST, "/exec") => Routes::Exec,
151
152        (&Method::GET, p) => match Scru128Id::from_str(p.trim_start_matches('/')) {
153            Ok(id) => Routes::StreamItemGet(id),
154            Err(e) => Routes::BadRequest(format!("Invalid frame ID: {e}")),
155        },
156
157        (&Method::DELETE, p) => match Scru128Id::from_str(p.trim_start_matches('/')) {
158            Ok(id) => Routes::StreamItemRemove(id),
159            Err(e) => Routes::BadRequest(format!("Invalid frame ID: {e}")),
160        },
161
162        (&Method::POST, path) if path.starts_with('/') => {
163            let topic = path.trim_start_matches('/').to_string();
164            let context_id = match params.get("context") {
165                None => crate::store::ZERO_CONTEXT,
166                Some(ctx) => match ctx.parse() {
167                    Ok(id) => id,
168                    Err(e) => return Routes::BadRequest(format!("Invalid context ID: {e}")),
169                },
170            };
171
172            match TTL::from_query(query) {
173                Ok(ttl) => Routes::StreamAppend {
174                    topic,
175                    ttl: Some(ttl),
176                    context_id,
177                },
178                Err(e) => Routes::BadRequest(e.to_string()),
179            }
180        }
181
182        _ => Routes::NotFound,
183    }
184}
185
186async fn handle(
187    mut store: Store,
188    _engine: nu::Engine, // TODO: potentially vestigial, will .process come back?
189    req: Request<hyper::body::Incoming>,
190) -> HTTPResult {
191    let method = req.method();
192    let path = req.uri().path();
193    let headers = req.headers().clone();
194    let query = req.uri().query();
195
196    let res = match match_route(method, path, &headers, query) {
197        Routes::Version => handle_version().await,
198
199        Routes::StreamCat {
200            accept_type,
201            options,
202        } => handle_stream_cat(&mut store, options, accept_type).await,
203
204        Routes::StreamAppend {
205            topic,
206            ttl,
207            context_id,
208        } => handle_stream_append(&mut store, req, topic, ttl, context_id).await,
209
210        Routes::CasGet(hash) => {
211            let reader = store.cas_reader(hash).await?;
212            let stream = ReaderStream::new(reader);
213
214            let stream = stream.map(|frame| {
215                let frame = frame.unwrap();
216                Ok(hyper::body::Frame::data(frame))
217            });
218
219            let body = StreamBody::new(stream).boxed();
220            Ok(Response::new(body))
221        }
222
223        Routes::CasPost => handle_cas_post(&mut store, req.into_body()).await,
224
225        Routes::StreamItemGet(id) => response_frame_or_404(store.get(&id)),
226
227        Routes::StreamItemRemove(id) => handle_stream_item_remove(&mut store, id).await,
228
229        Routes::HeadGet {
230            topic,
231            follow,
232            context_id,
233        } => handle_head_get(&store, &topic, follow, context_id).await,
234
235        Routes::Import => handle_import(&mut store, req.into_body()).await,
236
237        Routes::Exec => handle_exec(&store, req.into_body()).await,
238
239        Routes::NotFound => response_404(),
240        Routes::BadRequest(msg) => response_400(msg),
241    };
242
243    res.or_else(|e| response_500(e.to_string()))
244}
245
246async fn handle_stream_cat(
247    store: &mut Store,
248    options: ReadOptions,
249    accept_type: AcceptType,
250) -> HTTPResult {
251    let rx = store.read(options).await;
252    let stream = ReceiverStream::new(rx);
253
254    let accept_type_clone = accept_type.clone();
255    let stream = stream.map(move |frame| {
256        let bytes = match accept_type_clone {
257            AcceptType::Ndjson => {
258                let mut encoded = serde_json::to_vec(&frame).unwrap();
259                encoded.push(b'\n');
260                encoded
261            }
262            AcceptType::EventStream => format!(
263                "id: {id}\ndata: {data}\n\n",
264                id = frame.id,
265                data = serde_json::to_string(&frame).unwrap_or_default()
266            )
267            .into_bytes(),
268        };
269        Ok(hyper::body::Frame::data(Bytes::from(bytes)))
270    });
271
272    let body = StreamBody::new(stream).boxed();
273
274    let content_type = match accept_type {
275        AcceptType::Ndjson => "application/x-ndjson",
276        AcceptType::EventStream => "text/event-stream",
277    };
278
279    Ok(Response::builder()
280        .status(StatusCode::OK)
281        .header("Content-Type", content_type)
282        .body(body)?)
283}
284
285async fn handle_stream_append(
286    store: &mut Store,
287    req: Request<hyper::body::Incoming>,
288    topic: String,
289    ttl: Option<TTL>,
290    context_id: Scru128Id,
291) -> HTTPResult {
292    let (parts, mut body) = req.into_parts();
293
294    let hash = {
295        let mut writer = store.cas_writer().await?;
296        let mut bytes_written = 0;
297
298        while let Some(frame) = body.frame().await {
299            if let Ok(data) = frame?.into_data() {
300                writer.write_all(&data).await?;
301                bytes_written += data.len();
302            }
303        }
304
305        if bytes_written > 0 {
306            Some(writer.commit().await?)
307        } else {
308            None
309        }
310    };
311
312    let meta = match parts
313        .headers
314        .get("xs-meta")
315        .map(|x| x.to_str())
316        .transpose()
317        .unwrap()
318        .map(|s| {
319            // First decode the Base64-encoded string
320            base64::prelude::BASE64_STANDARD
321                .decode(s)
322                .map_err(|e| format!("xs-meta isn't valid Base64: {e}"))
323                .and_then(|decoded| {
324                    // Then parse the decoded bytes as UTF-8 string
325                    String::from_utf8(decoded)
326                        .map_err(|e| format!("xs-meta isn't valid UTF-8: {e}"))
327                        .and_then(|json_str| {
328                            // Finally parse the UTF-8 string as JSON
329                            serde_json::from_str(&json_str)
330                                .map_err(|e| format!("xs-meta isn't valid JSON: {e}"))
331                        })
332                })
333        })
334        .transpose()
335    {
336        Ok(meta) => meta,
337        Err(e) => return response_400(e.to_string()),
338    };
339
340    let frame = store.append(
341        Frame::builder(topic, context_id)
342            .maybe_hash(hash)
343            .maybe_meta(meta)
344            .maybe_ttl(ttl)
345            .build(),
346    )?;
347
348    Ok(Response::builder()
349        .status(StatusCode::OK)
350        .header("Content-Type", "application/json")
351        .body(full(serde_json::to_string(&frame).unwrap()))?)
352}
353
354async fn handle_cas_post(store: &mut Store, mut body: hyper::body::Incoming) -> HTTPResult {
355    let hash = {
356        let mut writer = store.cas_writer().await?;
357        let mut bytes_written = 0;
358
359        while let Some(frame) = body.frame().await {
360            if let Ok(data) = frame?.into_data() {
361                writer.write_all(&data).await?;
362                bytes_written += data.len();
363            }
364        }
365
366        if bytes_written == 0 {
367            return response_400("Empty body".to_string());
368        }
369
370        writer.commit().await?
371    };
372
373    Ok(Response::builder()
374        .status(StatusCode::OK)
375        .header("Content-Type", "text/plain")
376        .body(full(hash.to_string()))?)
377}
378
379async fn handle_version() -> HTTPResult {
380    let version = env!("CARGO_PKG_VERSION");
381    let version_info = serde_json::json!({ "version": version });
382    Ok(Response::builder()
383        .status(StatusCode::OK)
384        .header("Content-Type", "application/json")
385        .body(full(serde_json::to_string(&version_info).unwrap()))?)
386}
387
388pub async fn serve(
389    store: Store,
390    engine: nu::Engine,
391    expose: Option<String>,
392) -> Result<(), BoxError> {
393    let path = store.path.join("sock").to_string_lossy().to_string();
394    let listener = Listener::bind(&path).await?;
395
396    let mut listeners = vec![listener];
397    let mut expose_meta = None;
398
399    if let Some(expose) = expose {
400        let expose_listener = Listener::bind(&expose).await?;
401
402        // Check if this is an iroh listener and get the ticket
403        if let Some(ticket) = expose_listener.get_ticket() {
404            expose_meta = Some(serde_json::json!({"expose": format!("iroh://{}", ticket)}));
405        } else {
406            expose_meta = Some(serde_json::json!({"expose": expose}));
407        }
408
409        listeners.push(expose_listener);
410    }
411
412    if let Err(e) = store.append(
413        Frame::builder("xs.start", store::ZERO_CONTEXT)
414            .maybe_meta(expose_meta)
415            .build(),
416    ) {
417        tracing::error!("Failed to append xs.start frame: {}", e);
418    }
419
420    let mut tasks = Vec::new();
421    for listener in listeners {
422        let store = store.clone();
423        let engine = engine.clone();
424        let task = tokio::spawn(async move { listener_loop(listener, store, engine).await });
425        tasks.push(task);
426    }
427
428    // TODO: graceful shutdown and error handling
429    // Wait for all listener tasks to complete (or until the first error)
430    for task in tasks {
431        task.await??;
432    }
433
434    Ok(())
435}
436
437async fn listener_loop(
438    mut listener: Listener,
439    store: Store,
440    engine: nu::Engine,
441) -> Result<(), BoxError> {
442    loop {
443        let (stream, _) = listener.accept().await?;
444        let io = TokioIo::new(stream);
445        let store = store.clone();
446        let engine = engine.clone();
447        tokio::task::spawn(async move {
448            if let Err(err) = http1::Builder::new()
449                .serve_connection(
450                    io,
451                    service_fn(move |req| handle(store.clone(), engine.clone(), req)),
452                )
453                .await
454            {
455                // Match against the error kind to selectively ignore `NotConnected` errors
456                if let Some(std::io::ErrorKind::NotConnected) = err.source().and_then(|source| {
457                    source
458                        .downcast_ref::<std::io::Error>()
459                        .map(|io_err| io_err.kind())
460                }) {
461                    // ignore the NotConnected error, hyper's way of saying the client disconnected
462                } else {
463                    // todo, Handle or log other errors
464                    tracing::error!("TBD: {:?}", err);
465                }
466            }
467        });
468    }
469}
470
471fn response_frame_or_404(frame: Option<store::Frame>) -> HTTPResult {
472    if let Some(frame) = frame {
473        Ok(Response::builder()
474            .status(StatusCode::OK)
475            .header("Content-Type", "application/json")
476            .body(full(serde_json::to_string(&frame).unwrap()))?)
477    } else {
478        response_404()
479    }
480}
481
482async fn handle_stream_item_remove(store: &mut Store, id: Scru128Id) -> HTTPResult {
483    match store.remove(&id) {
484        Ok(()) => Ok(Response::builder()
485            .status(StatusCode::NO_CONTENT)
486            .body(empty())?),
487        Err(e) => {
488            tracing::error!("Failed to remove item {}: {:?}", id, e);
489
490            Ok(Response::builder()
491                .status(StatusCode::INTERNAL_SERVER_ERROR)
492                .body(full("internal-error"))?)
493        }
494    }
495}
496
497async fn handle_head_get(
498    store: &Store,
499    topic: &str,
500    follow: bool,
501    context_id: Scru128Id,
502) -> HTTPResult {
503    let current_head = store.head(topic, context_id);
504
505    if !follow {
506        return response_frame_or_404(current_head);
507    }
508
509    let rx = store
510        .read(
511            ReadOptions::builder()
512                .follow(FollowOption::On)
513                .tail(true)
514                .maybe_last_id(current_head.as_ref().map(|f| f.id))
515                .build(),
516        )
517        .await;
518
519    let topic = topic.to_string();
520    let stream = tokio_stream::wrappers::ReceiverStream::new(rx)
521        .filter(move |frame| frame.topic == topic)
522        .map(|frame| {
523            let mut bytes = serde_json::to_vec(&frame).unwrap();
524            bytes.push(b'\n');
525            Ok::<_, BoxError>(hyper::body::Frame::data(Bytes::from(bytes)))
526        });
527
528    let body = if let Some(frame) = current_head {
529        let mut head_bytes = serde_json::to_vec(&frame).unwrap();
530        head_bytes.push(b'\n');
531        let head_chunk = Ok(hyper::body::Frame::data(Bytes::from(head_bytes)));
532        StreamBody::new(futures::stream::once(async { head_chunk }).chain(stream)).boxed()
533    } else {
534        StreamBody::new(stream).boxed()
535    };
536
537    Ok(Response::builder()
538        .status(StatusCode::OK)
539        .header("Content-Type", "application/x-ndjson")
540        .body(body)?)
541}
542
543async fn handle_import(store: &mut Store, body: hyper::body::Incoming) -> HTTPResult {
544    let bytes = body.collect().await?.to_bytes();
545    let frame: Frame = match serde_json::from_slice(&bytes) {
546        Ok(frame) => frame,
547        Err(e) => return response_400(format!("Invalid frame JSON: {e}")),
548    };
549
550    store.insert_frame(&frame)?;
551
552    Ok(Response::builder()
553        .status(StatusCode::OK)
554        .header("Content-Type", "application/json")
555        .body(full(serde_json::to_string(&frame).unwrap()))?)
556}
557
558fn response_404() -> HTTPResult {
559    Ok(Response::builder()
560        .status(StatusCode::NOT_FOUND)
561        .body(empty())?)
562}
563
564fn response_400(message: String) -> HTTPResult {
565    let body = full(message);
566    Ok(Response::builder()
567        .status(StatusCode::BAD_REQUEST)
568        .body(body)?)
569}
570
571fn response_500(message: String) -> HTTPResult {
572    let body = full(message);
573    Ok(Response::builder()
574        .status(StatusCode::INTERNAL_SERVER_ERROR)
575        .body(body)?)
576}
577
578fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, BoxError> {
579    Full::new(chunk.into())
580        .map_err(|never| match never {})
581        .boxed()
582}
583
584fn empty() -> BoxBody<Bytes, BoxError> {
585    Empty::<Bytes>::new()
586        .map_err(|never| match never {})
587        .boxed()
588}
589
590async fn handle_exec(store: &Store, body: hyper::body::Incoming) -> HTTPResult {
591    // Read the script from the request body
592    let bytes = body.collect().await?.to_bytes();
593    let script =
594        String::from_utf8(bytes.to_vec()).map_err(|e| format!("Invalid UTF-8 in script: {e}"))?;
595
596    // Create nushell engine with store helper commands
597    let mut engine =
598        nu::Engine::new().map_err(|e| format!("Failed to create nushell engine: {e}"))?;
599
600    // Use system context for exec commands
601    let context_id = crate::store::ZERO_CONTEXT;
602
603    // Add core commands
604    nu::add_core_commands(&mut engine, store)
605        .map_err(|e| format!("Failed to add core commands to engine: {e}"))?;
606
607    // Add context-specific commands
608    engine
609        .add_commands(vec![
610            Box::new(nu::commands::cat_stream_command::CatStreamCommand::new(
611                store.clone(),
612                context_id,
613            )),
614            Box::new(nu::commands::head_stream_command::HeadStreamCommand::new(
615                store.clone(),
616                context_id,
617            )),
618            Box::new(nu::commands::append_command::AppendCommand::new(
619                store.clone(),
620                context_id,
621                serde_json::Value::Null,
622            )),
623        ])
624        .map_err(|e| format!("Failed to add context commands to engine: {e}"))?;
625
626    // Execute the script
627    let result = engine
628        .eval(nu_protocol::PipelineData::empty(), script)
629        .map_err(|e| format!("Script execution failed:\n{e}"))?;
630
631    // Format output based on PipelineData type according to spec
632    match result {
633        nu_protocol::PipelineData::ByteStream(stream, ..) => {
634            // ByteStream → raw bytes with proper streaming using channel pattern
635            if let Some(mut reader) = stream.reader() {
636                use std::io::Read;
637
638                let (tx, rx) = tokio::sync::mpsc::channel(16);
639
640                // Spawn sync task to read from nushell Reader and send to channel
641                std::thread::spawn(move || {
642                    let mut buffer = [0u8; 8192];
643                    loop {
644                        match reader.read(&mut buffer) {
645                            Ok(0) => break, // EOF
646                            Ok(n) => {
647                                let chunk = Bytes::copy_from_slice(&buffer[..n]);
648                                if tx
649                                    .blocking_send(Ok(hyper::body::Frame::data(chunk)))
650                                    .is_err()
651                                {
652                                    break;
653                                }
654                            }
655                            Err(e) => {
656                                let _ = tx.blocking_send(Err(
657                                    Box::new(e) as Box<dyn std::error::Error + Send + Sync>
658                                ));
659                                break;
660                            }
661                        }
662                    }
663                });
664
665                let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
666                let body = StreamBody::new(stream).boxed();
667                Ok(Response::builder()
668                    .status(StatusCode::OK)
669                    .header("Content-Type", "application/octet-stream")
670                    .body(body)?)
671            } else {
672                // No reader available, return empty response
673                Ok(Response::builder()
674                    .status(StatusCode::OK)
675                    .header("Content-Type", "application/octet-stream")
676                    .body(empty())?)
677            }
678        }
679        nu_protocol::PipelineData::ListStream(stream, ..) => {
680            // ListStream → JSONL stream with proper streaming using channel pattern
681            let (tx, rx) = tokio::sync::mpsc::channel(16);
682
683            // Spawn sync task to iterate stream and send JSONL to channel
684            std::thread::spawn(move || {
685                for value in stream.into_iter() {
686                    let json = nu::value_to_json(&value);
687                    match serde_json::to_vec(&json) {
688                        Ok(mut json_bytes) => {
689                            json_bytes.push(b'\n'); // Add newline for JSONL
690                            let chunk = Bytes::from(json_bytes);
691                            if tx
692                                .blocking_send(Ok(hyper::body::Frame::data(chunk)))
693                                .is_err()
694                            {
695                                break;
696                            }
697                        }
698                        Err(e) => {
699                            let _ = tx.blocking_send(Err(
700                                Box::new(e) as Box<dyn std::error::Error + Send + Sync>
701                            ));
702                            break;
703                        }
704                    }
705                }
706            });
707
708            let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
709            let body = StreamBody::new(stream).boxed();
710            Ok(Response::builder()
711                .status(StatusCode::OK)
712                .header("Content-Type", "application/x-ndjson")
713                .body(body)?)
714        }
715        nu_protocol::PipelineData::Value(value, ..) => {
716            match &value {
717                nu_protocol::Value::String { .. }
718                | nu_protocol::Value::Int { .. }
719                | nu_protocol::Value::Float { .. }
720                | nu_protocol::Value::Bool { .. } => {
721                    // Single primitive value → raw text
722                    let text = match value {
723                        nu_protocol::Value::String { val, .. } => val.clone(),
724                        nu_protocol::Value::Int { val, .. } => val.to_string(),
725                        nu_protocol::Value::Float { val, .. } => val.to_string(),
726                        nu_protocol::Value::Bool { val, .. } => val.to_string(),
727                        _ => value.into_string().unwrap_or_else(|_| "".to_string()),
728                    };
729                    Ok(Response::builder()
730                        .status(StatusCode::OK)
731                        .header("Content-Type", "text/plain")
732                        .body(full(text))?)
733                }
734                _ => {
735                    // Single structured value → JSON
736                    let json = nu::value_to_json(&value);
737                    let json_string = serde_json::to_string(&json)
738                        .map_err(|e| format!("Failed to serialize JSON: {e}"))?;
739                    Ok(Response::builder()
740                        .status(StatusCode::OK)
741                        .header("Content-Type", "application/json")
742                        .body(full(json_string))?)
743                }
744            }
745        }
746        nu_protocol::PipelineData::Empty => {
747            // Empty → nothing
748            Ok(Response::builder()
749                .status(StatusCode::NO_CONTENT)
750                .body(empty())?)
751        }
752    }
753}
754
755#[cfg(test)]
756mod tests {
757    use super::*;
758
759    #[test]
760    fn test_match_route_head_follow() {
761        let headers = hyper::HeaderMap::new();
762
763        assert!(matches!(
764            match_route(&Method::GET, "/head/test", &headers, None),
765            Routes::HeadGet { topic, follow: false, context_id: _ } if topic == "test"
766        ));
767
768        assert!(matches!(
769            match_route(&Method::GET, "/head/test", &headers, Some("follow=true")),
770            Routes::HeadGet { topic, follow: true, context_id: _ } if topic == "test"
771        ));
772    }
773
774    #[tokio::test]
775    async fn test_handle_exec_logic() {
776        // Test the core nushell execution logic by testing the engine directly
777        use crate::nu::Engine;
778        use crate::store::Store;
779        use nu_protocol::PipelineData;
780
781        // Create a temporary store for testing
782        let temp_dir = tempfile::tempdir().unwrap();
783        let store = Store::new(temp_dir.path().to_path_buf());
784
785        // Create nushell engine with store helper commands
786        let mut engine = Engine::new().unwrap();
787
788        // Use system context for exec commands
789        let context_id = crate::store::ZERO_CONTEXT;
790
791        // Add core commands
792        crate::nu::add_core_commands(&mut engine, &store).unwrap();
793
794        // Add context-specific commands
795        engine
796            .add_commands(vec![
797                Box::new(
798                    crate::nu::commands::cat_stream_command::CatStreamCommand::new(
799                        store.clone(),
800                        context_id,
801                    ),
802                ),
803                Box::new(
804                    crate::nu::commands::head_stream_command::HeadStreamCommand::new(
805                        store.clone(),
806                        context_id,
807                    ),
808                ),
809                Box::new(crate::nu::commands::append_command::AppendCommand::new(
810                    store.clone(),
811                    context_id,
812                    serde_json::Value::Null,
813                )),
814            ])
815            .unwrap();
816
817        // Test simple string expression
818        let result = engine
819            .eval(PipelineData::empty(), r#""hello world""#.to_string())
820            .unwrap();
821
822        match result {
823            PipelineData::Value(value, ..) => {
824                let text = value.into_string().unwrap();
825                assert_eq!(text, "hello world");
826            }
827            _ => panic!("Expected Value, got {:?}", result),
828        }
829
830        // Test simple math expression - result should be an integer
831        let result = engine
832            .eval(PipelineData::empty(), "2 + 3".to_string())
833            .unwrap();
834
835        match result {
836            PipelineData::Value(nu_protocol::Value::Int { val, .. }, ..) => {
837                assert_eq!(val, 5);
838            }
839            _ => panic!("Expected Int Value, got {:?}", result),
840        }
841    }
842}