xs/
api.rs

1use std::collections::HashMap;
2use std::error::Error;
3use std::str::FromStr;
4
5use scru128::Scru128Id;
6
7use base64::Engine;
8
9use tokio::io::AsyncWriteExt;
10use tokio_stream::wrappers::ReceiverStream;
11use tokio_stream::StreamExt;
12use tokio_util::io::ReaderStream;
13
14use http_body_util::StreamBody;
15use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full};
16use hyper::body::Bytes;
17use hyper::header::ACCEPT;
18use hyper::server::conn::http1;
19use hyper::service::service_fn;
20use hyper::{Method, Request, Response, StatusCode};
21use hyper_util::rt::TokioIo;
22
23use crate::listener::Listener;
24use crate::nu;
25use crate::store::{self, FollowOption, Frame, ReadOptions, Store, TTL};
26
27type BoxError = Box<dyn std::error::Error + Send + Sync>;
28type HTTPResult = Result<Response<BoxBody<Bytes, BoxError>>, BoxError>;
29
30#[derive(Debug, PartialEq, Clone)]
31enum AcceptType {
32    Ndjson,
33    EventStream,
34}
35
36enum Routes {
37    StreamCat {
38        accept_type: AcceptType,
39        options: ReadOptions,
40    },
41    StreamAppend {
42        topic: String,
43        ttl: Option<TTL>,
44    },
45    LastGet {
46        topic: String,
47        follow: bool,
48    },
49    StreamItemGet(Scru128Id),
50    StreamItemRemove(Scru128Id),
51    CasGet(ssri::Integrity),
52    CasPost,
53    Import,
54    Eval,
55    Version,
56    NotFound,
57    BadRequest(String),
58}
59
60/// Validates an Integrity object to ensure all its hashes are properly formatted
61fn validate_integrity(integrity: &ssri::Integrity) -> bool {
62    // Check if there are any hashes
63    if integrity.hashes.is_empty() {
64        return false;
65    }
66
67    // For each hash, check if it has a valid base64-encoded digest
68    for hash in &integrity.hashes {
69        // Check if digest is valid base64 using the modern API
70        if base64::engine::general_purpose::STANDARD
71            .decode(&hash.digest)
72            .is_err()
73        {
74            return false;
75        }
76    }
77
78    true
79}
80
81fn match_route(
82    method: &Method,
83    path: &str,
84    headers: &hyper::HeaderMap,
85    query: Option<&str>,
86) -> Routes {
87    let params: HashMap<String, String> =
88        url::form_urlencoded::parse(query.unwrap_or("").as_bytes())
89            .into_owned()
90            .collect();
91
92    match (method, path) {
93        (&Method::GET, "/version") => Routes::Version,
94
95        (&Method::GET, "/") => {
96            let accept_type = match headers.get(ACCEPT) {
97                Some(accept) if accept == "text/event-stream" => AcceptType::EventStream,
98                _ => AcceptType::Ndjson,
99            };
100
101            let options = ReadOptions::from_query(query);
102
103            match options {
104                Ok(options) => Routes::StreamCat {
105                    accept_type,
106                    options,
107                },
108                Err(e) => Routes::BadRequest(e.to_string()),
109            }
110        }
111
112        (&Method::GET, p) if p.starts_with("/last/") => {
113            let topic = p.strip_prefix("/last/").unwrap().to_string();
114            let follow = params.contains_key("follow");
115            Routes::LastGet { topic, follow }
116        }
117
118        (&Method::GET, p) if p.starts_with("/cas/") => {
119            if let Some(hash) = p.strip_prefix("/cas/") {
120                match ssri::Integrity::from_str(hash) {
121                    Ok(integrity) => {
122                        if validate_integrity(&integrity) {
123                            Routes::CasGet(integrity)
124                        } else {
125                            Routes::BadRequest(format!("Invalid CAS hash format: {hash}"))
126                        }
127                    }
128                    Err(e) => Routes::BadRequest(format!("Invalid CAS hash: {e}")),
129                }
130            } else {
131                Routes::NotFound
132            }
133        }
134
135        (&Method::POST, "/cas") => Routes::CasPost,
136        (&Method::POST, "/import") => Routes::Import,
137        (&Method::POST, "/eval") => Routes::Eval,
138
139        (&Method::GET, p) => match Scru128Id::from_str(p.trim_start_matches('/')) {
140            Ok(id) => Routes::StreamItemGet(id),
141            Err(e) => Routes::BadRequest(format!("Invalid frame ID: {e}")),
142        },
143
144        (&Method::DELETE, p) => match Scru128Id::from_str(p.trim_start_matches('/')) {
145            Ok(id) => Routes::StreamItemRemove(id),
146            Err(e) => Routes::BadRequest(format!("Invalid frame ID: {e}")),
147        },
148
149        (&Method::POST, path) if path.starts_with("/append/") => {
150            let topic = path.strip_prefix("/append/").unwrap().to_string();
151
152            match TTL::from_query(query) {
153                Ok(ttl) => Routes::StreamAppend {
154                    topic,
155                    ttl: Some(ttl),
156                },
157                Err(e) => Routes::BadRequest(e.to_string()),
158            }
159        }
160
161        _ => Routes::NotFound,
162    }
163}
164
165async fn handle(
166    mut store: Store,
167    _engine: nu::Engine, // TODO: potentially vestigial, will .process come back?
168    req: Request<hyper::body::Incoming>,
169) -> HTTPResult {
170    let method = req.method();
171    let path = req.uri().path();
172    let headers = req.headers().clone();
173    let query = req.uri().query();
174
175    let res = match match_route(method, path, &headers, query) {
176        Routes::Version => handle_version().await,
177
178        Routes::StreamCat {
179            accept_type,
180            options,
181        } => handle_stream_cat(&mut store, options, accept_type).await,
182
183        Routes::StreamAppend { topic, ttl } => {
184            handle_stream_append(&mut store, req, topic, ttl).await
185        }
186
187        Routes::CasGet(hash) => {
188            let reader = store.cas_reader(hash).await?;
189            let stream = ReaderStream::new(reader);
190
191            let stream = stream.map(|frame| {
192                let frame = frame.unwrap();
193                Ok(hyper::body::Frame::data(frame))
194            });
195
196            let body = StreamBody::new(stream).boxed();
197            Ok(Response::new(body))
198        }
199
200        Routes::CasPost => handle_cas_post(&mut store, req.into_body()).await,
201
202        Routes::StreamItemGet(id) => response_frame_or_404(store.get(&id)),
203
204        Routes::StreamItemRemove(id) => handle_stream_item_remove(&mut store, id).await,
205
206        Routes::LastGet { topic, follow } => handle_last_get(&store, &topic, follow).await,
207
208        Routes::Import => handle_import(&mut store, req.into_body()).await,
209
210        Routes::Eval => handle_eval(&store, req.into_body()).await,
211
212        Routes::NotFound => response_404(),
213        Routes::BadRequest(msg) => response_400(msg),
214    };
215
216    res.or_else(|e| response_500(e.to_string()))
217}
218
219async fn handle_stream_cat(
220    store: &mut Store,
221    options: ReadOptions,
222    accept_type: AcceptType,
223) -> HTTPResult {
224    let rx = store.read(options).await;
225    let stream = ReceiverStream::new(rx);
226
227    let accept_type_clone = accept_type.clone();
228    let stream = stream.map(move |frame| {
229        let bytes = match accept_type_clone {
230            AcceptType::Ndjson => {
231                let mut encoded = serde_json::to_vec(&frame).unwrap();
232                encoded.push(b'\n');
233                encoded
234            }
235            AcceptType::EventStream => format!(
236                "id: {id}\ndata: {data}\n\n",
237                id = frame.id,
238                data = serde_json::to_string(&frame).unwrap_or_default()
239            )
240            .into_bytes(),
241        };
242        Ok(hyper::body::Frame::data(Bytes::from(bytes)))
243    });
244
245    let body = StreamBody::new(stream).boxed();
246
247    let content_type = match accept_type {
248        AcceptType::Ndjson => "application/x-ndjson",
249        AcceptType::EventStream => "text/event-stream",
250    };
251
252    Ok(Response::builder()
253        .status(StatusCode::OK)
254        .header("Content-Type", content_type)
255        .body(body)?)
256}
257
258async fn handle_stream_append(
259    store: &mut Store,
260    req: Request<hyper::body::Incoming>,
261    topic: String,
262    ttl: Option<TTL>,
263) -> HTTPResult {
264    let (parts, mut body) = req.into_parts();
265
266    let hash = {
267        let mut writer = store.cas_writer().await?;
268        let mut bytes_written = 0;
269
270        while let Some(frame) = body.frame().await {
271            if let Ok(data) = frame?.into_data() {
272                writer.write_all(&data).await?;
273                bytes_written += data.len();
274            }
275        }
276
277        if bytes_written > 0 {
278            Some(writer.commit().await?)
279        } else {
280            None
281        }
282    };
283
284    let meta = match parts
285        .headers
286        .get("xs-meta")
287        .map(|x| x.to_str())
288        .transpose()
289        .unwrap()
290        .map(|s| {
291            // First decode the Base64-encoded string
292            base64::prelude::BASE64_STANDARD
293                .decode(s)
294                .map_err(|e| format!("xs-meta isn't valid Base64: {e}"))
295                .and_then(|decoded| {
296                    // Then parse the decoded bytes as UTF-8 string
297                    String::from_utf8(decoded)
298                        .map_err(|e| format!("xs-meta isn't valid UTF-8: {e}"))
299                        .and_then(|json_str| {
300                            // Finally parse the UTF-8 string as JSON
301                            serde_json::from_str(&json_str)
302                                .map_err(|e| format!("xs-meta isn't valid JSON: {e}"))
303                        })
304                })
305        })
306        .transpose()
307    {
308        Ok(meta) => meta,
309        Err(e) => return response_400(e.to_string()),
310    };
311
312    let frame = store.append(
313        Frame::builder(topic)
314            .maybe_hash(hash)
315            .maybe_meta(meta)
316            .maybe_ttl(ttl)
317            .build(),
318    )?;
319
320    Ok(Response::builder()
321        .status(StatusCode::OK)
322        .header("Content-Type", "application/json")
323        .body(full(serde_json::to_string(&frame).unwrap()))?)
324}
325
326async fn handle_cas_post(store: &mut Store, mut body: hyper::body::Incoming) -> HTTPResult {
327    let hash = {
328        let mut writer = store.cas_writer().await?;
329        let mut bytes_written = 0;
330
331        while let Some(frame) = body.frame().await {
332            if let Ok(data) = frame?.into_data() {
333                writer.write_all(&data).await?;
334                bytes_written += data.len();
335            }
336        }
337
338        if bytes_written == 0 {
339            return response_400("Empty body".to_string());
340        }
341
342        writer.commit().await?
343    };
344
345    Ok(Response::builder()
346        .status(StatusCode::OK)
347        .header("Content-Type", "text/plain")
348        .body(full(hash.to_string()))?)
349}
350
351async fn handle_version() -> HTTPResult {
352    let version = env!("CARGO_PKG_VERSION");
353    let version_info = serde_json::json!({ "version": version });
354    Ok(Response::builder()
355        .status(StatusCode::OK)
356        .header("Content-Type", "application/json")
357        .body(full(serde_json::to_string(&version_info).unwrap()))?)
358}
359
360pub async fn serve(
361    store: Store,
362    engine: nu::Engine,
363    expose: Option<String>,
364) -> Result<(), BoxError> {
365    let path = store.path.join("sock").to_string_lossy().to_string();
366    let listener = Listener::bind(&path).await?;
367
368    let mut listeners = vec![listener];
369    let mut expose_meta = None;
370
371    if let Some(expose) = expose {
372        let expose_listener = Listener::bind(&expose).await?;
373
374        // Check if this is an iroh listener and get the ticket
375        if let Some(ticket) = expose_listener.get_ticket() {
376            expose_meta = Some(serde_json::json!({"expose": format!("iroh://{}", ticket)}));
377        } else {
378            expose_meta = Some(serde_json::json!({"expose": expose}));
379        }
380
381        listeners.push(expose_listener);
382    }
383
384    if let Err(e) = store.append(Frame::builder("xs.start").maybe_meta(expose_meta).build()) {
385        tracing::error!("Failed to append xs.start frame: {}", e);
386    }
387
388    let mut tasks = Vec::new();
389    for listener in listeners {
390        let store = store.clone();
391        let engine = engine.clone();
392        let task = tokio::spawn(async move { listener_loop(listener, store, engine).await });
393        tasks.push(task);
394    }
395
396    // TODO: graceful shutdown and error handling
397    // Wait for all listener tasks to complete (or until the first error)
398    for task in tasks {
399        task.await??;
400    }
401
402    Ok(())
403}
404
405async fn listener_loop(
406    mut listener: Listener,
407    store: Store,
408    engine: nu::Engine,
409) -> Result<(), BoxError> {
410    loop {
411        let (stream, _) = listener.accept().await?;
412        let io = TokioIo::new(stream);
413        let store = store.clone();
414        let engine = engine.clone();
415        tokio::task::spawn(async move {
416            if let Err(err) = http1::Builder::new()
417                .serve_connection(
418                    io,
419                    service_fn(move |req| handle(store.clone(), engine.clone(), req)),
420                )
421                .await
422            {
423                // Match against the error kind to selectively ignore `NotConnected` errors
424                if let Some(std::io::ErrorKind::NotConnected) = err.source().and_then(|source| {
425                    source
426                        .downcast_ref::<std::io::Error>()
427                        .map(|io_err| io_err.kind())
428                }) {
429                    // ignore the NotConnected error, hyper's way of saying the client disconnected
430                } else {
431                    // todo, Handle or log other errors
432                    tracing::error!("TBD: {:?}", err);
433                }
434            }
435        });
436    }
437}
438
439fn response_frame_or_404(frame: Option<store::Frame>) -> HTTPResult {
440    if let Some(frame) = frame {
441        Ok(Response::builder()
442            .status(StatusCode::OK)
443            .header("Content-Type", "application/json")
444            .body(full(serde_json::to_string(&frame).unwrap()))?)
445    } else {
446        response_404()
447    }
448}
449
450async fn handle_stream_item_remove(store: &mut Store, id: Scru128Id) -> HTTPResult {
451    match store.remove(&id) {
452        Ok(()) => Ok(Response::builder()
453            .status(StatusCode::NO_CONTENT)
454            .body(empty())?),
455        Err(e) => {
456            tracing::error!("Failed to remove item {}: {:?}", id, e);
457
458            Ok(Response::builder()
459                .status(StatusCode::INTERNAL_SERVER_ERROR)
460                .body(full("internal-error"))?)
461        }
462    }
463}
464
465async fn handle_last_get(store: &Store, topic: &str, follow: bool) -> HTTPResult {
466    let current_head = store.last(topic);
467
468    if !follow {
469        return response_frame_or_404(current_head);
470    }
471
472    let rx = store
473        .read(
474            ReadOptions::builder()
475                .follow(FollowOption::On)
476                .new(true)
477                .maybe_after(current_head.as_ref().map(|f| f.id))
478                .build(),
479        )
480        .await;
481
482    let topic = topic.to_string();
483    let stream = tokio_stream::wrappers::ReceiverStream::new(rx)
484        .filter(move |frame| frame.topic == topic)
485        .map(|frame| {
486            let mut bytes = serde_json::to_vec(&frame).unwrap();
487            bytes.push(b'\n');
488            Ok::<_, BoxError>(hyper::body::Frame::data(Bytes::from(bytes)))
489        });
490
491    let body = if let Some(frame) = current_head {
492        let mut head_bytes = serde_json::to_vec(&frame).unwrap();
493        head_bytes.push(b'\n');
494        let head_chunk = Ok(hyper::body::Frame::data(Bytes::from(head_bytes)));
495        StreamBody::new(futures::stream::once(async { head_chunk }).chain(stream)).boxed()
496    } else {
497        StreamBody::new(stream).boxed()
498    };
499
500    Ok(Response::builder()
501        .status(StatusCode::OK)
502        .header("Content-Type", "application/x-ndjson")
503        .body(body)?)
504}
505
506async fn handle_import(store: &mut Store, body: hyper::body::Incoming) -> HTTPResult {
507    let bytes = body.collect().await?.to_bytes();
508    let frame: Frame = match serde_json::from_slice(&bytes) {
509        Ok(frame) => frame,
510        Err(e) => return response_400(format!("Invalid frame JSON: {e}")),
511    };
512
513    store.insert_frame(&frame)?;
514
515    Ok(Response::builder()
516        .status(StatusCode::OK)
517        .header("Content-Type", "application/json")
518        .body(full(serde_json::to_string(&frame).unwrap()))?)
519}
520
521fn response_404() -> HTTPResult {
522    Ok(Response::builder()
523        .status(StatusCode::NOT_FOUND)
524        .body(empty())?)
525}
526
527fn response_400(message: String) -> HTTPResult {
528    let body = full(message);
529    Ok(Response::builder()
530        .status(StatusCode::BAD_REQUEST)
531        .body(body)?)
532}
533
534fn response_500(message: String) -> HTTPResult {
535    let body = full(message);
536    Ok(Response::builder()
537        .status(StatusCode::INTERNAL_SERVER_ERROR)
538        .body(body)?)
539}
540
541fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, BoxError> {
542    Full::new(chunk.into())
543        .map_err(|never| match never {})
544        .boxed()
545}
546
547fn empty() -> BoxBody<Bytes, BoxError> {
548    Empty::<Bytes>::new()
549        .map_err(|never| match never {})
550        .boxed()
551}
552
553async fn handle_eval(store: &Store, body: hyper::body::Incoming) -> HTTPResult {
554    // Read the script from the request body
555    let bytes = body.collect().await?.to_bytes();
556    let script =
557        String::from_utf8(bytes.to_vec()).map_err(|e| format!("Invalid UTF-8 in script: {e}"))?;
558
559    // Create nushell engine with store helper commands
560    let mut engine =
561        nu::Engine::new().map_err(|e| format!("Failed to create nushell engine: {e}"))?;
562
563    // Add core commands
564    nu::add_core_commands(&mut engine, store)
565        .map_err(|e| format!("Failed to add core commands to engine: {e}"))?;
566
567    // Add streaming commands
568    engine
569        .add_commands(vec![
570            Box::new(nu::commands::cat_stream_command::CatStreamCommand::new(
571                store.clone(),
572            )),
573            Box::new(nu::commands::last_stream_command::LastStreamCommand::new(
574                store.clone(),
575            )),
576            Box::new(nu::commands::append_command::AppendCommand::new(
577                store.clone(),
578                serde_json::Value::Null,
579            )),
580        ])
581        .map_err(|e| format!("Failed to add streaming commands to engine: {e}"))?;
582
583    // Execute the script
584    let result = engine
585        .eval(nu_protocol::PipelineData::empty(), script)
586        .map_err(|e| format!("Script evaluation failed:\n{e}"))?;
587
588    // Format output based on PipelineData type according to spec
589    match result {
590        nu_protocol::PipelineData::ByteStream(stream, ..) => {
591            // ByteStream → raw bytes with proper streaming using channel pattern
592            if let Some(mut reader) = stream.reader() {
593                use std::io::Read;
594
595                let (tx, rx) = tokio::sync::mpsc::channel(16);
596
597                // Spawn sync task to read from nushell Reader and send to channel
598                std::thread::spawn(move || {
599                    let mut buffer = [0u8; 8192];
600                    loop {
601                        match reader.read(&mut buffer) {
602                            Ok(0) => break, // EOF
603                            Ok(n) => {
604                                let chunk = Bytes::copy_from_slice(&buffer[..n]);
605                                if tx
606                                    .blocking_send(Ok(hyper::body::Frame::data(chunk)))
607                                    .is_err()
608                                {
609                                    break;
610                                }
611                            }
612                            Err(e) => {
613                                let _ = tx.blocking_send(Err(
614                                    Box::new(e) as Box<dyn std::error::Error + Send + Sync>
615                                ));
616                                break;
617                            }
618                        }
619                    }
620                });
621
622                let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
623                let body = StreamBody::new(stream).boxed();
624                Ok(Response::builder()
625                    .status(StatusCode::OK)
626                    .header("Content-Type", "application/octet-stream")
627                    .body(body)?)
628            } else {
629                // No reader available, return empty response
630                Ok(Response::builder()
631                    .status(StatusCode::OK)
632                    .header("Content-Type", "application/octet-stream")
633                    .body(empty())?)
634            }
635        }
636        nu_protocol::PipelineData::ListStream(stream, ..) => {
637            // ListStream → JSONL stream with proper streaming using channel pattern
638            let (tx, rx) = tokio::sync::mpsc::channel(16);
639
640            // Spawn sync task to iterate stream and send JSONL to channel
641            std::thread::spawn(move || {
642                for value in stream.into_iter() {
643                    let json = nu::value_to_json(&value);
644                    match serde_json::to_vec(&json) {
645                        Ok(mut json_bytes) => {
646                            json_bytes.push(b'\n'); // Add newline for JSONL
647                            let chunk = Bytes::from(json_bytes);
648                            if tx
649                                .blocking_send(Ok(hyper::body::Frame::data(chunk)))
650                                .is_err()
651                            {
652                                break;
653                            }
654                        }
655                        Err(e) => {
656                            let _ = tx.blocking_send(Err(
657                                Box::new(e) as Box<dyn std::error::Error + Send + Sync>
658                            ));
659                            break;
660                        }
661                    }
662                }
663            });
664
665            let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
666            let body = StreamBody::new(stream).boxed();
667            Ok(Response::builder()
668                .status(StatusCode::OK)
669                .header("Content-Type", "application/x-ndjson")
670                .body(body)?)
671        }
672        nu_protocol::PipelineData::Value(value, ..) => {
673            match &value {
674                nu_protocol::Value::String { .. }
675                | nu_protocol::Value::Int { .. }
676                | nu_protocol::Value::Float { .. }
677                | nu_protocol::Value::Bool { .. } => {
678                    // Single primitive value → raw text
679                    let text = match value {
680                        nu_protocol::Value::String { val, .. } => val.clone(),
681                        nu_protocol::Value::Int { val, .. } => val.to_string(),
682                        nu_protocol::Value::Float { val, .. } => val.to_string(),
683                        nu_protocol::Value::Bool { val, .. } => val.to_string(),
684                        _ => value.into_string().unwrap_or_else(|_| "".to_string()),
685                    };
686                    Ok(Response::builder()
687                        .status(StatusCode::OK)
688                        .header("Content-Type", "text/plain")
689                        .body(full(text))?)
690                }
691                _ => {
692                    // Single structured value → JSON
693                    let json = nu::value_to_json(&value);
694                    let json_string = serde_json::to_string(&json)
695                        .map_err(|e| format!("Failed to serialize JSON: {e}"))?;
696                    Ok(Response::builder()
697                        .status(StatusCode::OK)
698                        .header("Content-Type", "application/json")
699                        .body(full(json_string))?)
700                }
701            }
702        }
703        nu_protocol::PipelineData::Empty => {
704            // Empty → nothing
705            Ok(Response::builder()
706                .status(StatusCode::NO_CONTENT)
707                .body(empty())?)
708        }
709    }
710}
711
712#[cfg(test)]
713mod tests {
714    use super::*;
715
716    #[test]
717    fn test_match_route_last_follow() {
718        let headers = hyper::HeaderMap::new();
719
720        assert!(matches!(
721            match_route(&Method::GET, "/last/test", &headers, None),
722            Routes::LastGet { topic, follow: false } if topic == "test"
723        ));
724
725        assert!(matches!(
726            match_route(&Method::GET, "/last/test", &headers, Some("follow=true")),
727            Routes::LastGet { topic, follow: true } if topic == "test"
728        ));
729    }
730
731    #[tokio::test]
732    async fn test_handle_eval_logic() {
733        // Test the core nushell execution logic by testing the engine directly
734        use crate::nu::Engine;
735        use crate::store::Store;
736        use nu_protocol::PipelineData;
737
738        // Create a temporary store for testing
739        let temp_dir = tempfile::tempdir().unwrap();
740        let store = Store::new(temp_dir.path().to_path_buf());
741
742        // Create nushell engine with store helper commands
743        let mut engine = Engine::new().unwrap();
744
745        // Add core commands
746        crate::nu::add_core_commands(&mut engine, &store).unwrap();
747
748        // Add streaming commands
749        engine
750            .add_commands(vec![
751                Box::new(
752                    crate::nu::commands::cat_stream_command::CatStreamCommand::new(store.clone()),
753                ),
754                Box::new(
755                    crate::nu::commands::last_stream_command::LastStreamCommand::new(store.clone()),
756                ),
757                Box::new(crate::nu::commands::append_command::AppendCommand::new(
758                    store.clone(),
759                    serde_json::Value::Null,
760                )),
761            ])
762            .unwrap();
763
764        // Test simple string expression
765        let result = engine
766            .eval(PipelineData::empty(), r#""hello world""#.to_string())
767            .unwrap();
768
769        match result {
770            PipelineData::Value(value, ..) => {
771                let text = value.into_string().unwrap();
772                assert_eq!(text, "hello world");
773            }
774            _ => panic!("Expected Value, got {:?}", result),
775        }
776
777        // Test simple math expression - result should be an integer
778        let result = engine
779            .eval(PipelineData::empty(), "2 + 3".to_string())
780            .unwrap();
781
782        match result {
783            PipelineData::Value(nu_protocol::Value::Int { val, .. }, ..) => {
784                assert_eq!(val, 5);
785            }
786            _ => panic!("Expected Int Value, got {:?}", result),
787        }
788    }
789}