1use std::collections::HashMap;
2use std::error::Error;
3use std::str::FromStr;
4
5use scru128::Scru128Id;
6
7use base64::Engine;
8
9use tokio::io::AsyncWriteExt;
10use tokio_stream::wrappers::ReceiverStream;
11use tokio_stream::StreamExt;
12use tokio_util::io::ReaderStream;
13
14use http_body_util::StreamBody;
15use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full};
16use hyper::body::Bytes;
17use hyper::header::ACCEPT;
18use hyper::server::conn::http1;
19use hyper::service::service_fn;
20use hyper::{Method, Request, Response, StatusCode};
21use hyper_util::rt::TokioIo;
22
23use crate::listener::Listener;
24use crate::nu;
25use crate::store::{self, FollowOption, Frame, ReadOptions, Store, TTL};
26
27type BoxError = Box<dyn std::error::Error + Send + Sync>;
28type HTTPResult = Result<Response<BoxBody<Bytes, BoxError>>, BoxError>;
29
30#[derive(Debug, PartialEq, Clone)]
31enum AcceptType {
32 Ndjson,
33 EventStream,
34}
35
36enum Routes {
37 StreamCat {
38 accept_type: AcceptType,
39 options: ReadOptions,
40 },
41 StreamAppend {
42 topic: String,
43 ttl: Option<TTL>,
44 },
45 LastGet {
46 topic: Option<String>,
47 last: usize,
48 follow: bool,
49 },
50 StreamItemGet(Scru128Id),
51 StreamItemRemove(Scru128Id),
52 CasGet(ssri::Integrity),
53 CasPost,
54 Import,
55 Eval,
56 Version,
57 NotFound,
58 BadRequest(String),
59}
60
61fn validate_integrity(integrity: &ssri::Integrity) -> bool {
63 if integrity.hashes.is_empty() {
65 return false;
66 }
67
68 for hash in &integrity.hashes {
70 if base64::engine::general_purpose::STANDARD
72 .decode(&hash.digest)
73 .is_err()
74 {
75 return false;
76 }
77 }
78
79 true
80}
81
82fn match_route(
83 method: &Method,
84 path: &str,
85 headers: &hyper::HeaderMap,
86 query: Option<&str>,
87) -> Routes {
88 let params: HashMap<String, String> =
89 url::form_urlencoded::parse(query.unwrap_or("").as_bytes())
90 .into_owned()
91 .collect();
92
93 match (method, path) {
94 (&Method::GET, "/version") => Routes::Version,
95
96 (&Method::GET, "/") => {
97 let accept_type = match headers.get(ACCEPT) {
98 Some(accept) if accept == "text/event-stream" => AcceptType::EventStream,
99 _ => AcceptType::Ndjson,
100 };
101
102 let options = ReadOptions::from_query(query);
103
104 match options {
105 Ok(options) => Routes::StreamCat {
106 accept_type,
107 options,
108 },
109 Err(e) => Routes::BadRequest(e.to_string()),
110 }
111 }
112
113 (&Method::GET, "/last") => {
114 let follow = params.contains_key("follow");
115 let last = params.get("last").and_then(|v| v.parse().ok()).unwrap_or(1);
116 Routes::LastGet {
117 topic: None,
118 last,
119 follow,
120 }
121 }
122
123 (&Method::GET, p) if p.starts_with("/last/") => {
124 let topic = p.strip_prefix("/last/").unwrap().to_string();
125 let follow = params.contains_key("follow");
126 let last = params.get("last").and_then(|v| v.parse().ok()).unwrap_or(1);
127 Routes::LastGet {
128 topic: Some(topic),
129 last,
130 follow,
131 }
132 }
133
134 (&Method::GET, p) if p.starts_with("/cas/") => {
135 if let Some(hash) = p.strip_prefix("/cas/") {
136 match ssri::Integrity::from_str(hash) {
137 Ok(integrity) => {
138 if validate_integrity(&integrity) {
139 Routes::CasGet(integrity)
140 } else {
141 Routes::BadRequest(format!("Invalid CAS hash format: {hash}"))
142 }
143 }
144 Err(e) => Routes::BadRequest(format!("Invalid CAS hash: {e}")),
145 }
146 } else {
147 Routes::NotFound
148 }
149 }
150
151 (&Method::POST, "/cas") => Routes::CasPost,
152 (&Method::POST, "/import") => Routes::Import,
153 (&Method::POST, "/eval") => Routes::Eval,
154
155 (&Method::GET, p) => match Scru128Id::from_str(p.trim_start_matches('/')) {
156 Ok(id) => Routes::StreamItemGet(id),
157 Err(e) => Routes::BadRequest(format!("Invalid frame ID: {e}")),
158 },
159
160 (&Method::DELETE, p) => match Scru128Id::from_str(p.trim_start_matches('/')) {
161 Ok(id) => Routes::StreamItemRemove(id),
162 Err(e) => Routes::BadRequest(format!("Invalid frame ID: {e}")),
163 },
164
165 (&Method::POST, path) if path.starts_with("/append/") => {
166 let topic = path.strip_prefix("/append/").unwrap().to_string();
167
168 match TTL::from_query(query) {
169 Ok(ttl) => Routes::StreamAppend {
170 topic,
171 ttl: Some(ttl),
172 },
173 Err(e) => Routes::BadRequest(e.to_string()),
174 }
175 }
176
177 _ => Routes::NotFound,
178 }
179}
180
181async fn handle(
182 mut store: Store,
183 _engine: nu::Engine, req: Request<hyper::body::Incoming>,
185) -> HTTPResult {
186 let method = req.method();
187 let path = req.uri().path();
188 let headers = req.headers().clone();
189 let query = req.uri().query();
190
191 let res = match match_route(method, path, &headers, query) {
192 Routes::Version => handle_version().await,
193
194 Routes::StreamCat {
195 accept_type,
196 options,
197 } => handle_stream_cat(&mut store, options, accept_type).await,
198
199 Routes::StreamAppend { topic, ttl } => {
200 handle_stream_append(&mut store, req, topic, ttl).await
201 }
202
203 Routes::CasGet(hash) => {
204 let reader = store.cas_reader(hash).await?;
205 let stream = ReaderStream::new(reader);
206
207 let stream = stream.map(|frame| {
208 let frame = frame.unwrap();
209 Ok(hyper::body::Frame::data(frame))
210 });
211
212 let body = StreamBody::new(stream).boxed();
213 Ok(Response::new(body))
214 }
215
216 Routes::CasPost => handle_cas_post(&mut store, req.into_body()).await,
217
218 Routes::StreamItemGet(id) => response_frame_or_404(store.get(&id)),
219
220 Routes::StreamItemRemove(id) => handle_stream_item_remove(&mut store, id).await,
221
222 Routes::LastGet {
223 topic,
224 last,
225 follow,
226 } => handle_last_get(&store, topic.as_deref(), last, follow).await,
227
228 Routes::Import => handle_import(&mut store, req.into_body()).await,
229
230 Routes::Eval => handle_eval(&store, req.into_body()).await,
231
232 Routes::NotFound => response_404(),
233 Routes::BadRequest(msg) => response_400(msg),
234 };
235
236 res.or_else(|e| response_500(e.to_string()))
237}
238
239async fn handle_stream_cat(
240 store: &mut Store,
241 options: ReadOptions,
242 accept_type: AcceptType,
243) -> HTTPResult {
244 let rx = store.read(options).await;
245 let stream = ReceiverStream::new(rx);
246
247 let accept_type_clone = accept_type.clone();
248 let stream = stream.map(move |frame| {
249 let bytes = match accept_type_clone {
250 AcceptType::Ndjson => {
251 let mut encoded = serde_json::to_vec(&frame).unwrap();
252 encoded.push(b'\n');
253 encoded
254 }
255 AcceptType::EventStream => format!(
256 "id: {id}\ndata: {data}\n\n",
257 id = frame.id,
258 data = serde_json::to_string(&frame).unwrap_or_default()
259 )
260 .into_bytes(),
261 };
262 Ok(hyper::body::Frame::data(Bytes::from(bytes)))
263 });
264
265 let body = StreamBody::new(stream).boxed();
266
267 let content_type = match accept_type {
268 AcceptType::Ndjson => "application/x-ndjson",
269 AcceptType::EventStream => "text/event-stream",
270 };
271
272 Ok(Response::builder()
273 .status(StatusCode::OK)
274 .header("Content-Type", content_type)
275 .body(body)?)
276}
277
278async fn handle_stream_append(
279 store: &mut Store,
280 req: Request<hyper::body::Incoming>,
281 topic: String,
282 ttl: Option<TTL>,
283) -> HTTPResult {
284 let (parts, mut body) = req.into_parts();
285
286 let hash = {
287 let mut writer = store.cas_writer().await?;
288 let mut bytes_written = 0;
289
290 while let Some(frame) = body.frame().await {
291 if let Ok(data) = frame?.into_data() {
292 writer.write_all(&data).await?;
293 bytes_written += data.len();
294 }
295 }
296
297 if bytes_written > 0 {
298 Some(writer.commit().await?)
299 } else {
300 None
301 }
302 };
303
304 let meta = match parts
305 .headers
306 .get("xs-meta")
307 .map(|x| x.to_str())
308 .transpose()
309 .unwrap()
310 .map(|s| {
311 base64::prelude::BASE64_STANDARD
313 .decode(s)
314 .map_err(|e| format!("xs-meta isn't valid Base64: {e}"))
315 .and_then(|decoded| {
316 String::from_utf8(decoded)
318 .map_err(|e| format!("xs-meta isn't valid UTF-8: {e}"))
319 .and_then(|json_str| {
320 serde_json::from_str(&json_str)
322 .map_err(|e| format!("xs-meta isn't valid JSON: {e}"))
323 })
324 })
325 })
326 .transpose()
327 {
328 Ok(meta) => meta,
329 Err(e) => return response_400(e.to_string()),
330 };
331
332 let frame = store.append(
333 Frame::builder(topic)
334 .maybe_hash(hash)
335 .maybe_meta(meta)
336 .maybe_ttl(ttl)
337 .build(),
338 )?;
339
340 Ok(Response::builder()
341 .status(StatusCode::OK)
342 .header("Content-Type", "application/json")
343 .body(full(serde_json::to_string(&frame).unwrap()))?)
344}
345
346async fn handle_cas_post(store: &mut Store, mut body: hyper::body::Incoming) -> HTTPResult {
347 let hash = {
348 let mut writer = store.cas_writer().await?;
349 let mut bytes_written = 0;
350
351 while let Some(frame) = body.frame().await {
352 if let Ok(data) = frame?.into_data() {
353 writer.write_all(&data).await?;
354 bytes_written += data.len();
355 }
356 }
357
358 if bytes_written == 0 {
359 return response_400("Empty body".to_string());
360 }
361
362 writer.commit().await?
363 };
364
365 Ok(Response::builder()
366 .status(StatusCode::OK)
367 .header("Content-Type", "text/plain")
368 .body(full(hash.to_string()))?)
369}
370
371async fn handle_version() -> HTTPResult {
372 let version = env!("CARGO_PKG_VERSION");
373 let version_info = serde_json::json!({ "version": version });
374 Ok(Response::builder()
375 .status(StatusCode::OK)
376 .header("Content-Type", "application/json")
377 .body(full(serde_json::to_string(&version_info).unwrap()))?)
378}
379
380pub async fn serve(
381 store: Store,
382 engine: nu::Engine,
383 expose: Option<String>,
384) -> Result<(), BoxError> {
385 let path = store.path.join("sock").to_string_lossy().to_string();
386 let listener = Listener::bind(&path).await?;
387
388 let mut listeners = vec![listener];
389 let mut expose_meta = None;
390
391 if let Some(expose) = expose {
392 let expose_listener = Listener::bind(&expose).await?;
393
394 if let Some(ticket) = expose_listener.get_ticket() {
396 expose_meta = Some(serde_json::json!({"expose": format!("iroh://{}", ticket)}));
397 } else {
398 expose_meta = Some(serde_json::json!({"expose": expose}));
399 }
400
401 listeners.push(expose_listener);
402 }
403
404 if let Err(e) = store.append(Frame::builder("xs.start").maybe_meta(expose_meta).build()) {
405 tracing::error!("Failed to append xs.start frame: {}", e);
406 }
407
408 let mut tasks = Vec::new();
409 for listener in listeners {
410 let store = store.clone();
411 let engine = engine.clone();
412 let task = tokio::spawn(async move { listener_loop(listener, store, engine).await });
413 tasks.push(task);
414 }
415
416 for task in tasks {
419 task.await??;
420 }
421
422 Ok(())
423}
424
425async fn listener_loop(
426 mut listener: Listener,
427 store: Store,
428 engine: nu::Engine,
429) -> Result<(), BoxError> {
430 loop {
431 let (stream, _) = listener.accept().await?;
432 let io = TokioIo::new(stream);
433 let store = store.clone();
434 let engine = engine.clone();
435 tokio::task::spawn(async move {
436 if let Err(err) = http1::Builder::new()
437 .serve_connection(
438 io,
439 service_fn(move |req| handle(store.clone(), engine.clone(), req)),
440 )
441 .await
442 {
443 if let Some(std::io::ErrorKind::NotConnected) = err.source().and_then(|source| {
445 source
446 .downcast_ref::<std::io::Error>()
447 .map(|io_err| io_err.kind())
448 }) {
449 } else {
451 tracing::error!("TBD: {:?}", err);
453 }
454 }
455 });
456 }
457}
458
459fn response_frame_or_404(frame: Option<store::Frame>) -> HTTPResult {
460 if let Some(frame) = frame {
461 Ok(Response::builder()
462 .status(StatusCode::OK)
463 .header("Content-Type", "application/json")
464 .body(full(serde_json::to_string(&frame).unwrap()))?)
465 } else {
466 response_404()
467 }
468}
469
470async fn handle_stream_item_remove(store: &mut Store, id: Scru128Id) -> HTTPResult {
471 match store.remove(&id) {
472 Ok(()) => Ok(Response::builder()
473 .status(StatusCode::NO_CONTENT)
474 .body(empty())?),
475 Err(e) => {
476 tracing::error!("Failed to remove item {}: {:?}", id, e);
477
478 Ok(Response::builder()
479 .status(StatusCode::INTERNAL_SERVER_ERROR)
480 .body(full("internal-error"))?)
481 }
482 }
483}
484
485async fn handle_last_get(
486 store: &Store,
487 topic: Option<&str>,
488 last: usize,
489 follow: bool,
490) -> HTTPResult {
491 if !follow {
492 let options = ReadOptions::builder()
493 .last(last)
494 .maybe_topic(topic.map(|t| t.to_string()))
495 .build();
496
497 let frames: Vec<Frame> = store.read_sync(options).collect();
498
499 if frames.is_empty() {
500 return response_404();
501 }
502
503 if last == 1 {
507 return response_frame_or_404(Some(frames.into_iter().next().unwrap()));
508 }
509
510 let mut body = Vec::new();
511 for frame in frames {
512 serde_json::to_writer(&mut body, &frame).unwrap();
513 body.push(b'\n');
514 }
515
516 return Ok(Response::builder()
517 .status(StatusCode::OK)
518 .header("Content-Type", "application/x-ndjson")
519 .body(full(body))?);
520 }
521
522 let rx = store
525 .read(
526 ReadOptions::builder()
527 .last(last)
528 .maybe_topic(topic.map(|t| t.to_string()))
529 .follow(FollowOption::On)
530 .build(),
531 )
532 .await;
533
534 let stream = tokio_stream::wrappers::ReceiverStream::new(rx).map(|frame| {
535 let mut bytes = serde_json::to_vec(&frame).unwrap();
536 bytes.push(b'\n');
537 Ok::<_, BoxError>(hyper::body::Frame::data(Bytes::from(bytes)))
538 });
539
540 Ok(Response::builder()
541 .status(StatusCode::OK)
542 .header("Content-Type", "application/x-ndjson")
543 .body(StreamBody::new(stream).boxed())?)
544}
545
546async fn handle_import(store: &mut Store, body: hyper::body::Incoming) -> HTTPResult {
547 let bytes = body.collect().await?.to_bytes();
548 let frame: Frame = match serde_json::from_slice(&bytes) {
549 Ok(frame) => frame,
550 Err(e) => return response_400(format!("Invalid frame JSON: {e}")),
551 };
552
553 store.insert_frame(&frame)?;
554
555 Ok(Response::builder()
556 .status(StatusCode::OK)
557 .header("Content-Type", "application/json")
558 .body(full(serde_json::to_string(&frame).unwrap()))?)
559}
560
561fn response_404() -> HTTPResult {
562 Ok(Response::builder()
563 .status(StatusCode::NOT_FOUND)
564 .body(empty())?)
565}
566
567fn response_400(message: String) -> HTTPResult {
568 let body = full(message);
569 Ok(Response::builder()
570 .status(StatusCode::BAD_REQUEST)
571 .body(body)?)
572}
573
574fn response_500(message: String) -> HTTPResult {
575 let body = full(message);
576 Ok(Response::builder()
577 .status(StatusCode::INTERNAL_SERVER_ERROR)
578 .body(body)?)
579}
580
581fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, BoxError> {
582 Full::new(chunk.into())
583 .map_err(|never| match never {})
584 .boxed()
585}
586
587fn empty() -> BoxBody<Bytes, BoxError> {
588 Empty::<Bytes>::new()
589 .map_err(|never| match never {})
590 .boxed()
591}
592
593async fn handle_eval(store: &Store, body: hyper::body::Incoming) -> HTTPResult {
594 let bytes = body.collect().await?.to_bytes();
596 let script =
597 String::from_utf8(bytes.to_vec()).map_err(|e| format!("Invalid UTF-8 in script: {e}"))?;
598
599 let mut engine =
601 nu::Engine::new().map_err(|e| format!("Failed to create nushell engine: {e}"))?;
602
603 nu::add_core_commands(&mut engine, store)
605 .map_err(|e| format!("Failed to add core commands to engine: {e}"))?;
606
607 engine
609 .add_commands(vec![
610 Box::new(nu::commands::cat_stream_command::CatStreamCommand::new(
611 store.clone(),
612 )),
613 Box::new(nu::commands::last_stream_command::LastStreamCommand::new(
614 store.clone(),
615 )),
616 Box::new(nu::commands::append_command::AppendCommand::new(
617 store.clone(),
618 serde_json::Value::Null,
619 )),
620 ])
621 .map_err(|e| format!("Failed to add streaming commands to engine: {e}"))?;
622
623 let result = engine
625 .eval(nu_protocol::PipelineData::empty(), script)
626 .map_err(|e| format!("Script evaluation failed:\n{e}"))?;
627
628 match result {
630 nu_protocol::PipelineData::ByteStream(stream, ..) => {
631 if let Some(mut reader) = stream.reader() {
633 use std::io::Read;
634
635 let (tx, rx) = tokio::sync::mpsc::channel(16);
636
637 std::thread::spawn(move || {
639 let mut buffer = [0u8; 8192];
640 loop {
641 match reader.read(&mut buffer) {
642 Ok(0) => break, Ok(n) => {
644 let chunk = Bytes::copy_from_slice(&buffer[..n]);
645 if tx
646 .blocking_send(Ok(hyper::body::Frame::data(chunk)))
647 .is_err()
648 {
649 break;
650 }
651 }
652 Err(e) => {
653 let _ = tx.blocking_send(Err(
654 Box::new(e) as Box<dyn std::error::Error + Send + Sync>
655 ));
656 break;
657 }
658 }
659 }
660 });
661
662 let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
663 let body = StreamBody::new(stream).boxed();
664 Ok(Response::builder()
665 .status(StatusCode::OK)
666 .header("Content-Type", "application/octet-stream")
667 .body(body)?)
668 } else {
669 Ok(Response::builder()
671 .status(StatusCode::OK)
672 .header("Content-Type", "application/octet-stream")
673 .body(empty())?)
674 }
675 }
676 nu_protocol::PipelineData::ListStream(stream, ..) => {
677 let (tx, rx) = tokio::sync::mpsc::channel(16);
679
680 std::thread::spawn(move || {
682 for value in stream.into_iter() {
683 let json = nu::value_to_json(&value);
684 match serde_json::to_vec(&json) {
685 Ok(mut json_bytes) => {
686 json_bytes.push(b'\n'); let chunk = Bytes::from(json_bytes);
688 if tx
689 .blocking_send(Ok(hyper::body::Frame::data(chunk)))
690 .is_err()
691 {
692 break;
693 }
694 }
695 Err(e) => {
696 let _ = tx.blocking_send(Err(
697 Box::new(e) as Box<dyn std::error::Error + Send + Sync>
698 ));
699 break;
700 }
701 }
702 }
703 });
704
705 let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
706 let body = StreamBody::new(stream).boxed();
707 Ok(Response::builder()
708 .status(StatusCode::OK)
709 .header("Content-Type", "application/x-ndjson")
710 .body(body)?)
711 }
712 nu_protocol::PipelineData::Value(value, ..) => {
713 match &value {
714 nu_protocol::Value::String { .. }
715 | nu_protocol::Value::Int { .. }
716 | nu_protocol::Value::Float { .. }
717 | nu_protocol::Value::Bool { .. } => {
718 let text = match value {
720 nu_protocol::Value::String { val, .. } => val.clone(),
721 nu_protocol::Value::Int { val, .. } => val.to_string(),
722 nu_protocol::Value::Float { val, .. } => val.to_string(),
723 nu_protocol::Value::Bool { val, .. } => val.to_string(),
724 _ => value.into_string().unwrap_or_else(|_| "".to_string()),
725 };
726 Ok(Response::builder()
727 .status(StatusCode::OK)
728 .header("Content-Type", "text/plain")
729 .body(full(text))?)
730 }
731 _ => {
732 let json = nu::value_to_json(&value);
734 let json_string = serde_json::to_string(&json)
735 .map_err(|e| format!("Failed to serialize JSON: {e}"))?;
736 Ok(Response::builder()
737 .status(StatusCode::OK)
738 .header("Content-Type", "application/json")
739 .body(full(json_string))?)
740 }
741 }
742 }
743 nu_protocol::PipelineData::Empty => {
744 Ok(Response::builder()
746 .status(StatusCode::NO_CONTENT)
747 .body(empty())?)
748 }
749 }
750}
751
752#[cfg(test)]
753mod tests {
754 use super::*;
755
756 #[test]
757 fn test_match_route_last() {
758 let headers = hyper::HeaderMap::new();
759
760 assert!(matches!(
762 match_route(&Method::GET, "/last/test", &headers, None),
763 Routes::LastGet { topic: Some(t), last: 1, follow: false } if t == "test"
764 ));
765
766 assert!(matches!(
768 match_route(&Method::GET, "/last/test", &headers, Some("follow=true")),
769 Routes::LastGet { topic: Some(t), last: 1, follow: true } if t == "test"
770 ));
771
772 assert!(matches!(
774 match_route(&Method::GET, "/last", &headers, None),
775 Routes::LastGet {
776 topic: None,
777 last: 1,
778 follow: false
779 }
780 ));
781
782 assert!(matches!(
784 match_route(&Method::GET, "/last", &headers, Some("last=5")),
785 Routes::LastGet {
786 topic: None,
787 last: 5,
788 follow: false
789 }
790 ));
791
792 assert!(matches!(
794 match_route(&Method::GET, "/last/test", &headers, Some("last=3&follow=true")),
795 Routes::LastGet { topic: Some(t), last: 3, follow: true } if t == "test"
796 ));
797 }
798
799 #[tokio::test]
800 async fn test_handle_last_get_ndjson_format() {
801 use crate::store::Store;
802 use http_body_util::BodyExt;
803
804 let temp_dir = tempfile::tempdir().unwrap();
805 let store = Store::new(temp_dir.path().to_path_buf());
806
807 store
809 .append(crate::store::Frame::builder("test").build())
810 .unwrap();
811
812 let response = handle_last_get(&store, Some("test"), 1, false)
814 .await
815 .unwrap();
816 let body = response.into_body().collect().await.unwrap().to_bytes();
817 let body_str = String::from_utf8(body.to_vec()).unwrap();
818 assert!(
819 !body_str.ends_with('\n'),
820 "Response should be JSON (no trailing newline) when last=1, got: {:?}",
821 body_str
822 );
823
824 let response = handle_last_get(&store, Some("test"), 3, false)
826 .await
827 .unwrap();
828 let body = response.into_body().collect().await.unwrap().to_bytes();
829 let body_str = String::from_utf8(body.to_vec()).unwrap();
830 assert!(
831 body_str.ends_with('\n'),
832 "Response should be NDJSON (end with newline) when last > 1, got: {:?}",
833 body_str
834 );
835
836 let headers = hyper::HeaderMap::new();
838 let route = match_route(&Method::GET, "/last/test", &headers, Some("last=3"));
839 if let Routes::LastGet {
840 topic,
841 last,
842 follow,
843 } = route
844 {
845 assert_eq!(last, 3, "Route should parse last=3");
846 let response = handle_last_get(&store, topic.as_deref(), last, follow)
847 .await
848 .unwrap();
849 let body = response.into_body().collect().await.unwrap().to_bytes();
850 let body_str = String::from_utf8(body.to_vec()).unwrap();
851 assert!(
852 body_str.ends_with('\n'),
853 "Full chain should return NDJSON when last=3, got: {:?}",
854 body_str
855 );
856 } else {
857 panic!("Expected Routes::LastGet");
858 }
859 }
860
861 #[tokio::test]
862 async fn test_handle_eval_logic() {
863 use crate::nu::Engine;
865 use crate::store::Store;
866 use nu_protocol::PipelineData;
867
868 let temp_dir = tempfile::tempdir().unwrap();
870 let store = Store::new(temp_dir.path().to_path_buf());
871
872 let mut engine = Engine::new().unwrap();
874
875 crate::nu::add_core_commands(&mut engine, &store).unwrap();
877
878 engine
880 .add_commands(vec![
881 Box::new(
882 crate::nu::commands::cat_stream_command::CatStreamCommand::new(store.clone()),
883 ),
884 Box::new(
885 crate::nu::commands::last_stream_command::LastStreamCommand::new(store.clone()),
886 ),
887 Box::new(crate::nu::commands::append_command::AppendCommand::new(
888 store.clone(),
889 serde_json::Value::Null,
890 )),
891 ])
892 .unwrap();
893
894 let result = engine
896 .eval(PipelineData::empty(), r#""hello world""#.to_string())
897 .unwrap();
898
899 match result {
900 PipelineData::Value(value, ..) => {
901 let text = value.into_string().unwrap();
902 assert_eq!(text, "hello world");
903 }
904 _ => panic!("Expected Value, got {:?}", result),
905 }
906
907 let result = engine
909 .eval(PipelineData::empty(), "2 + 3".to_string())
910 .unwrap();
911
912 match result {
913 PipelineData::Value(nu_protocol::Value::Int { val, .. }, ..) => {
914 assert_eq!(val, 5);
915 }
916 _ => panic!("Expected Int Value, got {:?}", result),
917 }
918 }
919}