1use std::collections::HashMap;
2use std::error::Error;
3use std::str::FromStr;
4
5use chrono::{DateTime, Utc};
6use scru128::Scru128Id;
7
8use base64::Engine;
9
10use tokio::io::AsyncWriteExt;
11use tokio_stream::wrappers::ReceiverStream;
12use tokio_stream::StreamExt;
13use tokio_util::io::ReaderStream;
14
15use http_body_util::StreamBody;
16use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full};
17use hyper::body::Bytes;
18use hyper::header::ACCEPT;
19use hyper::server::conn::http1;
20use hyper::service::service_fn;
21use hyper::{Method, Request, Response, StatusCode};
22use hyper_util::rt::TokioIo;
23
24use crate::listener::Listener;
25use crate::nu;
26use crate::store::{self, FollowOption, Frame, ReadOptions, Store, TTL};
27
28type BoxError = Box<dyn std::error::Error + Send + Sync>;
29type HTTPResult = Result<Response<BoxBody<Bytes, BoxError>>, BoxError>;
30
31fn serialize_frame(frame: &Frame, with_timestamp: bool) -> String {
33 if with_timestamp {
34 let mut value = serde_json::to_value(frame).unwrap();
35 if let serde_json::Value::Object(ref mut map) = value {
36 let millis = frame.id.timestamp() as i64;
37 let dt: DateTime<Utc> = DateTime::from_timestamp_millis(millis)
38 .unwrap_or_else(|| DateTime::from_timestamp(0, 0).unwrap());
39 map.insert("timestamp".to_string(), serde_json::json!(dt.to_rfc3339()));
40 }
41 serde_json::to_string(&value).unwrap()
42 } else {
43 serde_json::to_string(frame).unwrap()
44 }
45}
46
47#[derive(Debug, PartialEq, Clone)]
48enum AcceptType {
49 Ndjson,
50 EventStream,
51}
52
53enum Routes {
54 StreamCat {
55 accept_type: AcceptType,
56 options: ReadOptions,
57 with_timestamp: bool,
58 },
59 StreamAppend {
60 topic: String,
61 ttl: Option<TTL>,
62 with_timestamp: bool,
63 },
64 LastGet {
65 topic: Option<String>,
66 last: usize,
67 follow: bool,
68 with_timestamp: bool,
69 },
70 StreamItemGet {
71 id: Scru128Id,
72 with_timestamp: bool,
73 },
74 StreamItemRemove(Scru128Id),
75 CasGet(ssri::Integrity),
76 CasPost,
77 Import,
78 Eval,
79 Version,
80 NotFound,
81 BadRequest(String),
82}
83
84fn validate_integrity(integrity: &ssri::Integrity) -> bool {
86 if integrity.hashes.is_empty() {
88 return false;
89 }
90
91 for hash in &integrity.hashes {
93 if base64::engine::general_purpose::STANDARD
95 .decode(&hash.digest)
96 .is_err()
97 {
98 return false;
99 }
100 }
101
102 true
103}
104
105fn match_route(
106 method: &Method,
107 path: &str,
108 headers: &hyper::HeaderMap,
109 query: Option<&str>,
110) -> Routes {
111 let params: HashMap<String, String> =
112 url::form_urlencoded::parse(query.unwrap_or("").as_bytes())
113 .into_owned()
114 .collect();
115
116 match (method, path) {
117 (&Method::GET, "/version") => Routes::Version,
118
119 (&Method::GET, "/") => {
120 let accept_type = match headers.get(ACCEPT) {
121 Some(accept) if accept == "text/event-stream" => AcceptType::EventStream,
122 _ => AcceptType::Ndjson,
123 };
124
125 let options = ReadOptions::from_query(query);
126 let with_timestamp = params.contains_key("with-timestamp");
127
128 match options {
129 Ok(options) => Routes::StreamCat {
130 accept_type,
131 options,
132 with_timestamp,
133 },
134 Err(e) => Routes::BadRequest(e.to_string()),
135 }
136 }
137
138 (&Method::GET, "/last") => {
139 let follow = params.contains_key("follow");
140 let last = params.get("last").and_then(|v| v.parse().ok()).unwrap_or(1);
141 let with_timestamp = params.contains_key("with-timestamp");
142 Routes::LastGet {
143 topic: None,
144 last,
145 follow,
146 with_timestamp,
147 }
148 }
149
150 (&Method::GET, p) if p.starts_with("/last/") => {
151 let topic = p.strip_prefix("/last/").unwrap().to_string();
152 let follow = params.contains_key("follow");
153 let last = params.get("last").and_then(|v| v.parse().ok()).unwrap_or(1);
154 let with_timestamp = params.contains_key("with-timestamp");
155 Routes::LastGet {
156 topic: Some(topic),
157 last,
158 follow,
159 with_timestamp,
160 }
161 }
162
163 (&Method::GET, p) if p.starts_with("/cas/") => {
164 if let Some(hash) = p.strip_prefix("/cas/") {
165 match ssri::Integrity::from_str(hash) {
166 Ok(integrity) => {
167 if validate_integrity(&integrity) {
168 Routes::CasGet(integrity)
169 } else {
170 Routes::BadRequest(format!("Invalid CAS hash format: {hash}"))
171 }
172 }
173 Err(e) => Routes::BadRequest(format!("Invalid CAS hash: {e}")),
174 }
175 } else {
176 Routes::NotFound
177 }
178 }
179
180 (&Method::POST, "/cas") => Routes::CasPost,
181 (&Method::POST, "/import") => Routes::Import,
182 (&Method::POST, "/eval") => Routes::Eval,
183
184 (&Method::GET, p) => match Scru128Id::from_str(p.trim_start_matches('/')) {
185 Ok(id) => {
186 let with_timestamp = params.contains_key("with-timestamp");
187 Routes::StreamItemGet { id, with_timestamp }
188 }
189 Err(e) => Routes::BadRequest(format!("Invalid frame ID: {e}")),
190 },
191
192 (&Method::DELETE, p) => match Scru128Id::from_str(p.trim_start_matches('/')) {
193 Ok(id) => Routes::StreamItemRemove(id),
194 Err(e) => Routes::BadRequest(format!("Invalid frame ID: {e}")),
195 },
196
197 (&Method::POST, path) if path.starts_with("/append/") => {
198 let topic = path.strip_prefix("/append/").unwrap().to_string();
199 let with_timestamp = params.contains_key("with-timestamp");
200
201 match TTL::from_query(query) {
202 Ok(ttl) => Routes::StreamAppend {
203 topic,
204 ttl: Some(ttl),
205 with_timestamp,
206 },
207 Err(e) => Routes::BadRequest(e.to_string()),
208 }
209 }
210
211 _ => Routes::NotFound,
212 }
213}
214
215async fn handle(
216 mut store: Store,
217 _engine: nu::Engine, req: Request<hyper::body::Incoming>,
219) -> HTTPResult {
220 let method = req.method();
221 let path = req.uri().path();
222 let headers = req.headers().clone();
223 let query = req.uri().query();
224
225 let res = match match_route(method, path, &headers, query) {
226 Routes::Version => handle_version().await,
227
228 Routes::StreamCat {
229 accept_type,
230 options,
231 with_timestamp,
232 } => handle_stream_cat(&mut store, options, accept_type, with_timestamp).await,
233
234 Routes::StreamAppend {
235 topic,
236 ttl,
237 with_timestamp,
238 } => handle_stream_append(&mut store, req, topic, ttl, with_timestamp).await,
239
240 Routes::CasGet(hash) => {
241 let reader = store.cas_reader(hash).await?;
242 let stream = ReaderStream::new(reader);
243
244 let stream = stream.map(|frame| {
245 let frame = frame.unwrap();
246 Ok(hyper::body::Frame::data(frame))
247 });
248
249 let body = StreamBody::new(stream).boxed();
250 Ok(Response::new(body))
251 }
252
253 Routes::CasPost => handle_cas_post(&mut store, req.into_body()).await,
254
255 Routes::StreamItemGet { id, with_timestamp } => {
256 response_frame_or_404(store.get(&id), with_timestamp)
257 }
258
259 Routes::StreamItemRemove(id) => handle_stream_item_remove(&mut store, id).await,
260
261 Routes::LastGet {
262 topic,
263 last,
264 follow,
265 with_timestamp,
266 } => handle_last_get(&store, topic.as_deref(), last, follow, with_timestamp).await,
267
268 Routes::Import => handle_import(&mut store, req.into_body()).await,
269
270 Routes::Eval => handle_eval(&store, req.into_body()).await,
271
272 Routes::NotFound => response_404(),
273 Routes::BadRequest(msg) => response_400(msg),
274 };
275
276 res.or_else(|e| response_500(e.to_string()))
277}
278
279async fn handle_stream_cat(
280 store: &mut Store,
281 options: ReadOptions,
282 accept_type: AcceptType,
283 with_timestamp: bool,
284) -> HTTPResult {
285 let rx = store.read(options).await;
286 let stream = ReceiverStream::new(rx);
287
288 let accept_type_clone = accept_type.clone();
289 let stream = stream.map(move |frame| {
290 let bytes = match accept_type_clone {
291 AcceptType::Ndjson => {
292 let mut encoded = serialize_frame(&frame, with_timestamp).into_bytes();
293 encoded.push(b'\n');
294 encoded
295 }
296 AcceptType::EventStream => format!(
297 "id: {id}\ndata: {data}\n\n",
298 id = frame.id,
299 data = serialize_frame(&frame, with_timestamp)
300 )
301 .into_bytes(),
302 };
303 Ok(hyper::body::Frame::data(Bytes::from(bytes)))
304 });
305
306 let body = StreamBody::new(stream).boxed();
307
308 let content_type = match accept_type {
309 AcceptType::Ndjson => "application/x-ndjson",
310 AcceptType::EventStream => "text/event-stream",
311 };
312
313 Ok(Response::builder()
314 .status(StatusCode::OK)
315 .header("Content-Type", content_type)
316 .body(body)?)
317}
318
319async fn handle_stream_append(
320 store: &mut Store,
321 req: Request<hyper::body::Incoming>,
322 topic: String,
323 ttl: Option<TTL>,
324 with_timestamp: bool,
325) -> HTTPResult {
326 let (parts, mut body) = req.into_parts();
327
328 let hash = {
329 let mut writer = store.cas_writer().await?;
330 let mut bytes_written = 0;
331
332 while let Some(frame) = body.frame().await {
333 if let Ok(data) = frame?.into_data() {
334 writer.write_all(&data).await?;
335 bytes_written += data.len();
336 }
337 }
338
339 if bytes_written > 0 {
340 Some(writer.commit().await?)
341 } else {
342 None
343 }
344 };
345
346 let meta = match parts
347 .headers
348 .get("xs-meta")
349 .map(|x| x.to_str())
350 .transpose()
351 .unwrap()
352 .map(|s| {
353 base64::prelude::BASE64_STANDARD
355 .decode(s)
356 .map_err(|e| format!("xs-meta isn't valid Base64: {e}"))
357 .and_then(|decoded| {
358 String::from_utf8(decoded)
360 .map_err(|e| format!("xs-meta isn't valid UTF-8: {e}"))
361 .and_then(|json_str| {
362 serde_json::from_str(&json_str)
364 .map_err(|e| format!("xs-meta isn't valid JSON: {e}"))
365 })
366 })
367 })
368 .transpose()
369 {
370 Ok(meta) => meta,
371 Err(e) => return response_400(e.to_string()),
372 };
373
374 let frame = store.append(
375 Frame::builder(topic)
376 .maybe_hash(hash)
377 .maybe_meta(meta)
378 .maybe_ttl(ttl)
379 .build(),
380 )?;
381
382 Ok(Response::builder()
383 .status(StatusCode::OK)
384 .header("Content-Type", "application/json")
385 .body(full(serialize_frame(&frame, with_timestamp)))?)
386}
387
388async fn handle_cas_post(store: &mut Store, mut body: hyper::body::Incoming) -> HTTPResult {
389 let hash = {
390 let mut writer = store.cas_writer().await?;
391 let mut bytes_written = 0;
392
393 while let Some(frame) = body.frame().await {
394 if let Ok(data) = frame?.into_data() {
395 writer.write_all(&data).await?;
396 bytes_written += data.len();
397 }
398 }
399
400 if bytes_written == 0 {
401 return response_400("Empty body".to_string());
402 }
403
404 writer.commit().await?
405 };
406
407 Ok(Response::builder()
408 .status(StatusCode::OK)
409 .header("Content-Type", "text/plain")
410 .body(full(hash.to_string()))?)
411}
412
413async fn handle_version() -> HTTPResult {
414 let version = env!("CARGO_PKG_VERSION");
415 let version_info = serde_json::json!({ "version": version });
416 Ok(Response::builder()
417 .status(StatusCode::OK)
418 .header("Content-Type", "application/json")
419 .body(full(serde_json::to_string(&version_info).unwrap()))?)
420}
421
422pub async fn serve(
423 store: Store,
424 engine: nu::Engine,
425 expose: Option<String>,
426) -> Result<(), BoxError> {
427 let path = store.path.join("sock").to_string_lossy().to_string();
428 let listener = Listener::bind(&path).await?;
429
430 let mut listeners = vec![listener];
431 let mut expose_meta = None;
432
433 if let Some(expose) = expose {
434 let expose_listener = Listener::bind(&expose).await?;
435
436 if let Some(ticket) = expose_listener.get_ticket() {
438 expose_meta = Some(serde_json::json!({"expose": format!("iroh://{}", ticket)}));
439 } else {
440 expose_meta = Some(serde_json::json!({"expose": expose}));
441 }
442
443 listeners.push(expose_listener);
444 }
445
446 if let Err(e) = store.append(Frame::builder("xs.start").maybe_meta(expose_meta).build()) {
447 tracing::error!("Failed to append xs.start frame: {}", e);
448 }
449
450 let mut tasks = Vec::new();
451 for listener in listeners {
452 let store = store.clone();
453 let engine = engine.clone();
454 let task = tokio::spawn(async move { listener_loop(listener, store, engine).await });
455 tasks.push(task);
456 }
457
458 for task in tasks {
461 task.await??;
462 }
463
464 Ok(())
465}
466
467async fn listener_loop(
468 mut listener: Listener,
469 store: Store,
470 engine: nu::Engine,
471) -> Result<(), BoxError> {
472 loop {
473 let (stream, _) = listener.accept().await?;
474 let io = TokioIo::new(stream);
475 let store = store.clone();
476 let engine = engine.clone();
477 tokio::task::spawn(async move {
478 if let Err(err) = http1::Builder::new()
479 .serve_connection(
480 io,
481 service_fn(move |req| handle(store.clone(), engine.clone(), req)),
482 )
483 .await
484 {
485 if let Some(std::io::ErrorKind::NotConnected) = err.source().and_then(|source| {
487 source
488 .downcast_ref::<std::io::Error>()
489 .map(|io_err| io_err.kind())
490 }) {
491 } else {
493 tracing::error!("TBD: {:?}", err);
495 }
496 }
497 });
498 }
499}
500
501fn response_frame_or_404(frame: Option<store::Frame>, with_timestamp: bool) -> HTTPResult {
502 if let Some(frame) = frame {
503 Ok(Response::builder()
504 .status(StatusCode::OK)
505 .header("Content-Type", "application/json")
506 .body(full(serialize_frame(&frame, with_timestamp)))?)
507 } else {
508 response_404()
509 }
510}
511
512async fn handle_stream_item_remove(store: &mut Store, id: Scru128Id) -> HTTPResult {
513 match store.remove(&id) {
514 Ok(()) => Ok(Response::builder()
515 .status(StatusCode::NO_CONTENT)
516 .body(empty())?),
517 Err(e) => {
518 tracing::error!("Failed to remove item {}: {:?}", id, e);
519
520 Ok(Response::builder()
521 .status(StatusCode::INTERNAL_SERVER_ERROR)
522 .body(full("internal-error"))?)
523 }
524 }
525}
526
527async fn handle_last_get(
528 store: &Store,
529 topic: Option<&str>,
530 last: usize,
531 follow: bool,
532 with_timestamp: bool,
533) -> HTTPResult {
534 if !follow {
535 let options = ReadOptions::builder()
536 .last(last)
537 .maybe_topic(topic.map(|t| t.to_string()))
538 .build();
539
540 let frames: Vec<Frame> = store.read_sync(options).collect();
541
542 if frames.is_empty() {
543 return response_404();
544 }
545
546 if last == 1 {
550 return response_frame_or_404(Some(frames.into_iter().next().unwrap()), with_timestamp);
551 }
552
553 let mut body = Vec::new();
554 for frame in frames {
555 body.extend(serialize_frame(&frame, with_timestamp).into_bytes());
556 body.push(b'\n');
557 }
558
559 return Ok(Response::builder()
560 .status(StatusCode::OK)
561 .header("Content-Type", "application/x-ndjson")
562 .body(full(body))?);
563 }
564
565 let rx = store
568 .read(
569 ReadOptions::builder()
570 .last(last)
571 .maybe_topic(topic.map(|t| t.to_string()))
572 .follow(FollowOption::On)
573 .build(),
574 )
575 .await;
576
577 let stream = tokio_stream::wrappers::ReceiverStream::new(rx).map(move |frame| {
578 let mut bytes = serialize_frame(&frame, with_timestamp).into_bytes();
579 bytes.push(b'\n');
580 Ok::<_, BoxError>(hyper::body::Frame::data(Bytes::from(bytes)))
581 });
582
583 Ok(Response::builder()
584 .status(StatusCode::OK)
585 .header("Content-Type", "application/x-ndjson")
586 .body(StreamBody::new(stream).boxed())?)
587}
588
589async fn handle_import(store: &mut Store, body: hyper::body::Incoming) -> HTTPResult {
590 let bytes = body.collect().await?.to_bytes();
591 let frame: Frame = match serde_json::from_slice(&bytes) {
592 Ok(frame) => frame,
593 Err(e) => return response_400(format!("Invalid frame JSON: {e}")),
594 };
595
596 store.insert_frame(&frame)?;
597
598 Ok(Response::builder()
599 .status(StatusCode::OK)
600 .header("Content-Type", "application/json")
601 .body(full(serde_json::to_string(&frame).unwrap()))?)
602}
603
604fn response_404() -> HTTPResult {
605 Ok(Response::builder()
606 .status(StatusCode::NOT_FOUND)
607 .body(empty())?)
608}
609
610fn response_400(message: String) -> HTTPResult {
611 let body = full(message);
612 Ok(Response::builder()
613 .status(StatusCode::BAD_REQUEST)
614 .body(body)?)
615}
616
617fn response_500(message: String) -> HTTPResult {
618 let body = full(message);
619 Ok(Response::builder()
620 .status(StatusCode::INTERNAL_SERVER_ERROR)
621 .body(body)?)
622}
623
624fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, BoxError> {
625 Full::new(chunk.into())
626 .map_err(|never| match never {})
627 .boxed()
628}
629
630fn empty() -> BoxBody<Bytes, BoxError> {
631 Empty::<Bytes>::new()
632 .map_err(|never| match never {})
633 .boxed()
634}
635
636async fn handle_eval(store: &Store, body: hyper::body::Incoming) -> HTTPResult {
637 let bytes = body.collect().await?.to_bytes();
639 let script =
640 String::from_utf8(bytes.to_vec()).map_err(|e| format!("Invalid UTF-8 in script: {e}"))?;
641
642 let mut engine =
644 nu::Engine::new().map_err(|e| format!("Failed to create nushell engine: {e}"))?;
645
646 nu::add_core_commands(&mut engine, store)
648 .map_err(|e| format!("Failed to add core commands to engine: {e}"))?;
649
650 engine
652 .add_commands(vec![
653 Box::new(nu::commands::cat_stream_command::CatStreamCommand::new(
654 store.clone(),
655 )),
656 Box::new(nu::commands::last_stream_command::LastStreamCommand::new(
657 store.clone(),
658 )),
659 Box::new(nu::commands::append_command::AppendCommand::new(
660 store.clone(),
661 serde_json::Value::Null,
662 )),
663 ])
664 .map_err(|e| format!("Failed to add streaming commands to engine: {e}"))?;
665
666 let result = engine
668 .eval(nu_protocol::PipelineData::empty(), script)
669 .map_err(|e| format!("Script evaluation failed:\n{e}"))?;
670
671 match result {
673 nu_protocol::PipelineData::ByteStream(stream, ..) => {
674 if let Some(mut reader) = stream.reader() {
676 use std::io::Read;
677
678 let (tx, rx) = tokio::sync::mpsc::channel(16);
679
680 std::thread::spawn(move || {
682 let mut buffer = [0u8; 8192];
683 loop {
684 match reader.read(&mut buffer) {
685 Ok(0) => break, Ok(n) => {
687 let chunk = Bytes::copy_from_slice(&buffer[..n]);
688 if tx
689 .blocking_send(Ok(hyper::body::Frame::data(chunk)))
690 .is_err()
691 {
692 break;
693 }
694 }
695 Err(e) => {
696 let _ = tx.blocking_send(Err(
697 Box::new(e) as Box<dyn std::error::Error + Send + Sync>
698 ));
699 break;
700 }
701 }
702 }
703 });
704
705 let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
706 let body = StreamBody::new(stream).boxed();
707 Ok(Response::builder()
708 .status(StatusCode::OK)
709 .header("Content-Type", "application/octet-stream")
710 .body(body)?)
711 } else {
712 Ok(Response::builder()
714 .status(StatusCode::OK)
715 .header("Content-Type", "application/octet-stream")
716 .body(empty())?)
717 }
718 }
719 nu_protocol::PipelineData::ListStream(stream, ..) => {
720 let (tx, rx) = tokio::sync::mpsc::channel(16);
722
723 std::thread::spawn(move || {
725 for value in stream.into_iter() {
726 let json = nu::value_to_json(&value);
727 match serde_json::to_vec(&json) {
728 Ok(mut json_bytes) => {
729 json_bytes.push(b'\n'); let chunk = Bytes::from(json_bytes);
731 if tx
732 .blocking_send(Ok(hyper::body::Frame::data(chunk)))
733 .is_err()
734 {
735 break;
736 }
737 }
738 Err(e) => {
739 let _ = tx.blocking_send(Err(
740 Box::new(e) as Box<dyn std::error::Error + Send + Sync>
741 ));
742 break;
743 }
744 }
745 }
746 });
747
748 let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
749 let body = StreamBody::new(stream).boxed();
750 Ok(Response::builder()
751 .status(StatusCode::OK)
752 .header("Content-Type", "application/x-ndjson")
753 .body(body)?)
754 }
755 nu_protocol::PipelineData::Value(value, ..) => {
756 match &value {
757 nu_protocol::Value::String { .. }
758 | nu_protocol::Value::Int { .. }
759 | nu_protocol::Value::Float { .. }
760 | nu_protocol::Value::Bool { .. } => {
761 let text = match value {
763 nu_protocol::Value::String { val, .. } => val.clone(),
764 nu_protocol::Value::Int { val, .. } => val.to_string(),
765 nu_protocol::Value::Float { val, .. } => val.to_string(),
766 nu_protocol::Value::Bool { val, .. } => val.to_string(),
767 _ => value.into_string().unwrap_or_else(|_| "".to_string()),
768 };
769 Ok(Response::builder()
770 .status(StatusCode::OK)
771 .header("Content-Type", "text/plain")
772 .body(full(text))?)
773 }
774 _ => {
775 let json = nu::value_to_json(&value);
777 let json_string = serde_json::to_string(&json)
778 .map_err(|e| format!("Failed to serialize JSON: {e}"))?;
779 Ok(Response::builder()
780 .status(StatusCode::OK)
781 .header("Content-Type", "application/json")
782 .body(full(json_string))?)
783 }
784 }
785 }
786 nu_protocol::PipelineData::Empty => {
787 Ok(Response::builder()
789 .status(StatusCode::NO_CONTENT)
790 .body(empty())?)
791 }
792 }
793}
794
795#[cfg(test)]
796mod tests {
797 use super::*;
798
799 #[test]
800 fn test_match_route_last() {
801 let headers = hyper::HeaderMap::new();
802
803 assert!(matches!(
805 match_route(&Method::GET, "/last/test", &headers, None),
806 Routes::LastGet { topic: Some(t), last: 1, follow: false, .. } if t == "test"
807 ));
808
809 assert!(matches!(
811 match_route(&Method::GET, "/last/test", &headers, Some("follow=true")),
812 Routes::LastGet { topic: Some(t), last: 1, follow: true, .. } if t == "test"
813 ));
814
815 assert!(matches!(
817 match_route(&Method::GET, "/last", &headers, None),
818 Routes::LastGet {
819 topic: None,
820 last: 1,
821 follow: false,
822 ..
823 }
824 ));
825
826 assert!(matches!(
828 match_route(&Method::GET, "/last", &headers, Some("last=5")),
829 Routes::LastGet {
830 topic: None,
831 last: 5,
832 follow: false,
833 ..
834 }
835 ));
836
837 assert!(matches!(
839 match_route(&Method::GET, "/last/test", &headers, Some("last=3&follow=true")),
840 Routes::LastGet { topic: Some(t), last: 3, follow: true, .. } if t == "test"
841 ));
842 }
843
844 #[tokio::test]
845 async fn test_handle_last_get_ndjson_format() {
846 use crate::store::Store;
847 use http_body_util::BodyExt;
848
849 let temp_dir = tempfile::tempdir().unwrap();
850 let store = Store::new(temp_dir.path().to_path_buf()).unwrap();
851
852 store
854 .append(crate::store::Frame::builder("test").build())
855 .unwrap();
856
857 let response = handle_last_get(&store, Some("test"), 1, false, false)
859 .await
860 .unwrap();
861 let body = response.into_body().collect().await.unwrap().to_bytes();
862 let body_str = String::from_utf8(body.to_vec()).unwrap();
863 assert!(
864 !body_str.ends_with('\n'),
865 "Response should be JSON (no trailing newline) when last=1, got: {:?}",
866 body_str
867 );
868
869 let response = handle_last_get(&store, Some("test"), 3, false, false)
871 .await
872 .unwrap();
873 let body = response.into_body().collect().await.unwrap().to_bytes();
874 let body_str = String::from_utf8(body.to_vec()).unwrap();
875 assert!(
876 body_str.ends_with('\n'),
877 "Response should be NDJSON (end with newline) when last > 1, got: {:?}",
878 body_str
879 );
880
881 let headers = hyper::HeaderMap::new();
883 let route = match_route(&Method::GET, "/last/test", &headers, Some("last=3"));
884 if let Routes::LastGet {
885 topic,
886 last,
887 follow,
888 with_timestamp,
889 } = route
890 {
891 assert_eq!(last, 3, "Route should parse last=3");
892 let response = handle_last_get(&store, topic.as_deref(), last, follow, with_timestamp)
893 .await
894 .unwrap();
895 let body = response.into_body().collect().await.unwrap().to_bytes();
896 let body_str = String::from_utf8(body.to_vec()).unwrap();
897 assert!(
898 body_str.ends_with('\n'),
899 "Full chain should return NDJSON when last=3, got: {:?}",
900 body_str
901 );
902 } else {
903 panic!("Expected Routes::LastGet");
904 }
905 }
906
907 #[tokio::test]
908 async fn test_handle_eval_logic() {
909 use crate::nu::Engine;
911 use crate::store::Store;
912 use nu_protocol::PipelineData;
913
914 let temp_dir = tempfile::tempdir().unwrap();
916 let store = Store::new(temp_dir.path().to_path_buf()).unwrap();
917
918 let mut engine = Engine::new().unwrap();
920
921 crate::nu::add_core_commands(&mut engine, &store).unwrap();
923
924 engine
926 .add_commands(vec![
927 Box::new(
928 crate::nu::commands::cat_stream_command::CatStreamCommand::new(store.clone()),
929 ),
930 Box::new(
931 crate::nu::commands::last_stream_command::LastStreamCommand::new(store.clone()),
932 ),
933 Box::new(crate::nu::commands::append_command::AppendCommand::new(
934 store.clone(),
935 serde_json::Value::Null,
936 )),
937 ])
938 .unwrap();
939
940 let result = engine
942 .eval(PipelineData::empty(), r#""hello world""#.to_string())
943 .unwrap();
944
945 match result {
946 PipelineData::Value(value, ..) => {
947 let text = value.into_string().unwrap();
948 assert_eq!(text, "hello world");
949 }
950 _ => panic!("Expected Value, got {:?}", result),
951 }
952
953 let result = engine
955 .eval(PipelineData::empty(), "2 + 3".to_string())
956 .unwrap();
957
958 match result {
959 PipelineData::Value(nu_protocol::Value::Int { val, .. }, ..) => {
960 assert_eq!(val, 5);
961 }
962 _ => panic!("Expected Int Value, got {:?}", result),
963 }
964 }
965}