1use std::collections::HashMap;
2use std::error::Error;
3use std::str::FromStr;
4
5use scru128::Scru128Id;
6
7use base64::Engine;
8
9use tokio::io::AsyncWriteExt;
10use tokio_stream::wrappers::ReceiverStream;
11use tokio_stream::StreamExt;
12use tokio_util::io::ReaderStream;
13
14use http_body_util::StreamBody;
15use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full};
16use hyper::body::Bytes;
17use hyper::header::ACCEPT;
18use hyper::server::conn::http1;
19use hyper::service::service_fn;
20use hyper::{Method, Request, Response, StatusCode};
21use hyper_util::rt::TokioIo;
22
23use crate::listener::Listener;
24use crate::nu;
25use crate::store::{self, FollowOption, Frame, ReadOptions, Store, TTL};
26
27type BoxError = Box<dyn std::error::Error + Send + Sync>;
28type HTTPResult = Result<Response<BoxBody<Bytes, BoxError>>, BoxError>;
29
30#[derive(Debug, PartialEq, Clone)]
31enum AcceptType {
32 Ndjson,
33 EventStream,
34}
35
36enum Routes {
37 StreamCat {
38 accept_type: AcceptType,
39 options: ReadOptions,
40 },
41 StreamAppend {
42 topic: String,
43 ttl: Option<TTL>,
44 },
45 LastGet {
46 topic: String,
47 follow: bool,
48 },
49 StreamItemGet(Scru128Id),
50 StreamItemRemove(Scru128Id),
51 CasGet(ssri::Integrity),
52 CasPost,
53 Import,
54 Eval,
55 Version,
56 NotFound,
57 BadRequest(String),
58}
59
60fn validate_integrity(integrity: &ssri::Integrity) -> bool {
62 if integrity.hashes.is_empty() {
64 return false;
65 }
66
67 for hash in &integrity.hashes {
69 if base64::engine::general_purpose::STANDARD
71 .decode(&hash.digest)
72 .is_err()
73 {
74 return false;
75 }
76 }
77
78 true
79}
80
81fn match_route(
82 method: &Method,
83 path: &str,
84 headers: &hyper::HeaderMap,
85 query: Option<&str>,
86) -> Routes {
87 let params: HashMap<String, String> =
88 url::form_urlencoded::parse(query.unwrap_or("").as_bytes())
89 .into_owned()
90 .collect();
91
92 match (method, path) {
93 (&Method::GET, "/version") => Routes::Version,
94
95 (&Method::GET, "/") => {
96 let accept_type = match headers.get(ACCEPT) {
97 Some(accept) if accept == "text/event-stream" => AcceptType::EventStream,
98 _ => AcceptType::Ndjson,
99 };
100
101 let options = ReadOptions::from_query(query);
102
103 match options {
104 Ok(options) => Routes::StreamCat {
105 accept_type,
106 options,
107 },
108 Err(e) => Routes::BadRequest(e.to_string()),
109 }
110 }
111
112 (&Method::GET, p) if p.starts_with("/last/") => {
113 let topic = p.strip_prefix("/last/").unwrap().to_string();
114 let follow = params.contains_key("follow");
115 Routes::LastGet { topic, follow }
116 }
117
118 (&Method::GET, p) if p.starts_with("/cas/") => {
119 if let Some(hash) = p.strip_prefix("/cas/") {
120 match ssri::Integrity::from_str(hash) {
121 Ok(integrity) => {
122 if validate_integrity(&integrity) {
123 Routes::CasGet(integrity)
124 } else {
125 Routes::BadRequest(format!("Invalid CAS hash format: {hash}"))
126 }
127 }
128 Err(e) => Routes::BadRequest(format!("Invalid CAS hash: {e}")),
129 }
130 } else {
131 Routes::NotFound
132 }
133 }
134
135 (&Method::POST, "/cas") => Routes::CasPost,
136 (&Method::POST, "/import") => Routes::Import,
137 (&Method::POST, "/eval") => Routes::Eval,
138
139 (&Method::GET, p) => match Scru128Id::from_str(p.trim_start_matches('/')) {
140 Ok(id) => Routes::StreamItemGet(id),
141 Err(e) => Routes::BadRequest(format!("Invalid frame ID: {e}")),
142 },
143
144 (&Method::DELETE, p) => match Scru128Id::from_str(p.trim_start_matches('/')) {
145 Ok(id) => Routes::StreamItemRemove(id),
146 Err(e) => Routes::BadRequest(format!("Invalid frame ID: {e}")),
147 },
148
149 (&Method::POST, path) if path.starts_with("/append/") => {
150 let topic = path.strip_prefix("/append/").unwrap().to_string();
151
152 match TTL::from_query(query) {
153 Ok(ttl) => Routes::StreamAppend {
154 topic,
155 ttl: Some(ttl),
156 },
157 Err(e) => Routes::BadRequest(e.to_string()),
158 }
159 }
160
161 _ => Routes::NotFound,
162 }
163}
164
165async fn handle(
166 mut store: Store,
167 _engine: nu::Engine, req: Request<hyper::body::Incoming>,
169) -> HTTPResult {
170 let method = req.method();
171 let path = req.uri().path();
172 let headers = req.headers().clone();
173 let query = req.uri().query();
174
175 let res = match match_route(method, path, &headers, query) {
176 Routes::Version => handle_version().await,
177
178 Routes::StreamCat {
179 accept_type,
180 options,
181 } => handle_stream_cat(&mut store, options, accept_type).await,
182
183 Routes::StreamAppend { topic, ttl } => {
184 handle_stream_append(&mut store, req, topic, ttl).await
185 }
186
187 Routes::CasGet(hash) => {
188 let reader = store.cas_reader(hash).await?;
189 let stream = ReaderStream::new(reader);
190
191 let stream = stream.map(|frame| {
192 let frame = frame.unwrap();
193 Ok(hyper::body::Frame::data(frame))
194 });
195
196 let body = StreamBody::new(stream).boxed();
197 Ok(Response::new(body))
198 }
199
200 Routes::CasPost => handle_cas_post(&mut store, req.into_body()).await,
201
202 Routes::StreamItemGet(id) => response_frame_or_404(store.get(&id)),
203
204 Routes::StreamItemRemove(id) => handle_stream_item_remove(&mut store, id).await,
205
206 Routes::LastGet { topic, follow } => handle_last_get(&store, &topic, follow).await,
207
208 Routes::Import => handle_import(&mut store, req.into_body()).await,
209
210 Routes::Eval => handle_eval(&store, req.into_body()).await,
211
212 Routes::NotFound => response_404(),
213 Routes::BadRequest(msg) => response_400(msg),
214 };
215
216 res.or_else(|e| response_500(e.to_string()))
217}
218
219async fn handle_stream_cat(
220 store: &mut Store,
221 options: ReadOptions,
222 accept_type: AcceptType,
223) -> HTTPResult {
224 let rx = store.read(options).await;
225 let stream = ReceiverStream::new(rx);
226
227 let accept_type_clone = accept_type.clone();
228 let stream = stream.map(move |frame| {
229 let bytes = match accept_type_clone {
230 AcceptType::Ndjson => {
231 let mut encoded = serde_json::to_vec(&frame).unwrap();
232 encoded.push(b'\n');
233 encoded
234 }
235 AcceptType::EventStream => format!(
236 "id: {id}\ndata: {data}\n\n",
237 id = frame.id,
238 data = serde_json::to_string(&frame).unwrap_or_default()
239 )
240 .into_bytes(),
241 };
242 Ok(hyper::body::Frame::data(Bytes::from(bytes)))
243 });
244
245 let body = StreamBody::new(stream).boxed();
246
247 let content_type = match accept_type {
248 AcceptType::Ndjson => "application/x-ndjson",
249 AcceptType::EventStream => "text/event-stream",
250 };
251
252 Ok(Response::builder()
253 .status(StatusCode::OK)
254 .header("Content-Type", content_type)
255 .body(body)?)
256}
257
258async fn handle_stream_append(
259 store: &mut Store,
260 req: Request<hyper::body::Incoming>,
261 topic: String,
262 ttl: Option<TTL>,
263) -> HTTPResult {
264 let (parts, mut body) = req.into_parts();
265
266 let hash = {
267 let mut writer = store.cas_writer().await?;
268 let mut bytes_written = 0;
269
270 while let Some(frame) = body.frame().await {
271 if let Ok(data) = frame?.into_data() {
272 writer.write_all(&data).await?;
273 bytes_written += data.len();
274 }
275 }
276
277 if bytes_written > 0 {
278 Some(writer.commit().await?)
279 } else {
280 None
281 }
282 };
283
284 let meta = match parts
285 .headers
286 .get("xs-meta")
287 .map(|x| x.to_str())
288 .transpose()
289 .unwrap()
290 .map(|s| {
291 base64::prelude::BASE64_STANDARD
293 .decode(s)
294 .map_err(|e| format!("xs-meta isn't valid Base64: {e}"))
295 .and_then(|decoded| {
296 String::from_utf8(decoded)
298 .map_err(|e| format!("xs-meta isn't valid UTF-8: {e}"))
299 .and_then(|json_str| {
300 serde_json::from_str(&json_str)
302 .map_err(|e| format!("xs-meta isn't valid JSON: {e}"))
303 })
304 })
305 })
306 .transpose()
307 {
308 Ok(meta) => meta,
309 Err(e) => return response_400(e.to_string()),
310 };
311
312 let frame = store.append(
313 Frame::builder(topic)
314 .maybe_hash(hash)
315 .maybe_meta(meta)
316 .maybe_ttl(ttl)
317 .build(),
318 )?;
319
320 Ok(Response::builder()
321 .status(StatusCode::OK)
322 .header("Content-Type", "application/json")
323 .body(full(serde_json::to_string(&frame).unwrap()))?)
324}
325
326async fn handle_cas_post(store: &mut Store, mut body: hyper::body::Incoming) -> HTTPResult {
327 let hash = {
328 let mut writer = store.cas_writer().await?;
329 let mut bytes_written = 0;
330
331 while let Some(frame) = body.frame().await {
332 if let Ok(data) = frame?.into_data() {
333 writer.write_all(&data).await?;
334 bytes_written += data.len();
335 }
336 }
337
338 if bytes_written == 0 {
339 return response_400("Empty body".to_string());
340 }
341
342 writer.commit().await?
343 };
344
345 Ok(Response::builder()
346 .status(StatusCode::OK)
347 .header("Content-Type", "text/plain")
348 .body(full(hash.to_string()))?)
349}
350
351async fn handle_version() -> HTTPResult {
352 let version = env!("CARGO_PKG_VERSION");
353 let version_info = serde_json::json!({ "version": version });
354 Ok(Response::builder()
355 .status(StatusCode::OK)
356 .header("Content-Type", "application/json")
357 .body(full(serde_json::to_string(&version_info).unwrap()))?)
358}
359
360pub async fn serve(
361 store: Store,
362 engine: nu::Engine,
363 expose: Option<String>,
364) -> Result<(), BoxError> {
365 let path = store.path.join("sock").to_string_lossy().to_string();
366 let listener = Listener::bind(&path).await?;
367
368 let mut listeners = vec![listener];
369 let mut expose_meta = None;
370
371 if let Some(expose) = expose {
372 let expose_listener = Listener::bind(&expose).await?;
373
374 if let Some(ticket) = expose_listener.get_ticket() {
376 expose_meta = Some(serde_json::json!({"expose": format!("iroh://{}", ticket)}));
377 } else {
378 expose_meta = Some(serde_json::json!({"expose": expose}));
379 }
380
381 listeners.push(expose_listener);
382 }
383
384 if let Err(e) = store.append(Frame::builder("xs.start").maybe_meta(expose_meta).build()) {
385 tracing::error!("Failed to append xs.start frame: {}", e);
386 }
387
388 let mut tasks = Vec::new();
389 for listener in listeners {
390 let store = store.clone();
391 let engine = engine.clone();
392 let task = tokio::spawn(async move { listener_loop(listener, store, engine).await });
393 tasks.push(task);
394 }
395
396 for task in tasks {
399 task.await??;
400 }
401
402 Ok(())
403}
404
405async fn listener_loop(
406 mut listener: Listener,
407 store: Store,
408 engine: nu::Engine,
409) -> Result<(), BoxError> {
410 loop {
411 let (stream, _) = listener.accept().await?;
412 let io = TokioIo::new(stream);
413 let store = store.clone();
414 let engine = engine.clone();
415 tokio::task::spawn(async move {
416 if let Err(err) = http1::Builder::new()
417 .serve_connection(
418 io,
419 service_fn(move |req| handle(store.clone(), engine.clone(), req)),
420 )
421 .await
422 {
423 if let Some(std::io::ErrorKind::NotConnected) = err.source().and_then(|source| {
425 source
426 .downcast_ref::<std::io::Error>()
427 .map(|io_err| io_err.kind())
428 }) {
429 } else {
431 tracing::error!("TBD: {:?}", err);
433 }
434 }
435 });
436 }
437}
438
439fn response_frame_or_404(frame: Option<store::Frame>) -> HTTPResult {
440 if let Some(frame) = frame {
441 Ok(Response::builder()
442 .status(StatusCode::OK)
443 .header("Content-Type", "application/json")
444 .body(full(serde_json::to_string(&frame).unwrap()))?)
445 } else {
446 response_404()
447 }
448}
449
450async fn handle_stream_item_remove(store: &mut Store, id: Scru128Id) -> HTTPResult {
451 match store.remove(&id) {
452 Ok(()) => Ok(Response::builder()
453 .status(StatusCode::NO_CONTENT)
454 .body(empty())?),
455 Err(e) => {
456 tracing::error!("Failed to remove item {}: {:?}", id, e);
457
458 Ok(Response::builder()
459 .status(StatusCode::INTERNAL_SERVER_ERROR)
460 .body(full("internal-error"))?)
461 }
462 }
463}
464
465async fn handle_last_get(store: &Store, topic: &str, follow: bool) -> HTTPResult {
466 let current_head = store.last(topic);
467
468 if !follow {
469 return response_frame_or_404(current_head);
470 }
471
472 let rx = store
473 .read(
474 ReadOptions::builder()
475 .follow(FollowOption::On)
476 .new(true)
477 .maybe_after(current_head.as_ref().map(|f| f.id))
478 .build(),
479 )
480 .await;
481
482 let topic = topic.to_string();
483 let stream = tokio_stream::wrappers::ReceiverStream::new(rx)
484 .filter(move |frame| frame.topic == topic)
485 .map(|frame| {
486 let mut bytes = serde_json::to_vec(&frame).unwrap();
487 bytes.push(b'\n');
488 Ok::<_, BoxError>(hyper::body::Frame::data(Bytes::from(bytes)))
489 });
490
491 let body = if let Some(frame) = current_head {
492 let mut head_bytes = serde_json::to_vec(&frame).unwrap();
493 head_bytes.push(b'\n');
494 let head_chunk = Ok(hyper::body::Frame::data(Bytes::from(head_bytes)));
495 StreamBody::new(futures::stream::once(async { head_chunk }).chain(stream)).boxed()
496 } else {
497 StreamBody::new(stream).boxed()
498 };
499
500 Ok(Response::builder()
501 .status(StatusCode::OK)
502 .header("Content-Type", "application/x-ndjson")
503 .body(body)?)
504}
505
506async fn handle_import(store: &mut Store, body: hyper::body::Incoming) -> HTTPResult {
507 let bytes = body.collect().await?.to_bytes();
508 let frame: Frame = match serde_json::from_slice(&bytes) {
509 Ok(frame) => frame,
510 Err(e) => return response_400(format!("Invalid frame JSON: {e}")),
511 };
512
513 store.insert_frame(&frame)?;
514
515 Ok(Response::builder()
516 .status(StatusCode::OK)
517 .header("Content-Type", "application/json")
518 .body(full(serde_json::to_string(&frame).unwrap()))?)
519}
520
521fn response_404() -> HTTPResult {
522 Ok(Response::builder()
523 .status(StatusCode::NOT_FOUND)
524 .body(empty())?)
525}
526
527fn response_400(message: String) -> HTTPResult {
528 let body = full(message);
529 Ok(Response::builder()
530 .status(StatusCode::BAD_REQUEST)
531 .body(body)?)
532}
533
534fn response_500(message: String) -> HTTPResult {
535 let body = full(message);
536 Ok(Response::builder()
537 .status(StatusCode::INTERNAL_SERVER_ERROR)
538 .body(body)?)
539}
540
541fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, BoxError> {
542 Full::new(chunk.into())
543 .map_err(|never| match never {})
544 .boxed()
545}
546
547fn empty() -> BoxBody<Bytes, BoxError> {
548 Empty::<Bytes>::new()
549 .map_err(|never| match never {})
550 .boxed()
551}
552
553async fn handle_eval(store: &Store, body: hyper::body::Incoming) -> HTTPResult {
554 let bytes = body.collect().await?.to_bytes();
556 let script =
557 String::from_utf8(bytes.to_vec()).map_err(|e| format!("Invalid UTF-8 in script: {e}"))?;
558
559 let mut engine =
561 nu::Engine::new().map_err(|e| format!("Failed to create nushell engine: {e}"))?;
562
563 nu::add_core_commands(&mut engine, store)
565 .map_err(|e| format!("Failed to add core commands to engine: {e}"))?;
566
567 engine
569 .add_commands(vec![
570 Box::new(nu::commands::cat_stream_command::CatStreamCommand::new(
571 store.clone(),
572 )),
573 Box::new(nu::commands::last_stream_command::LastStreamCommand::new(
574 store.clone(),
575 )),
576 Box::new(nu::commands::append_command::AppendCommand::new(
577 store.clone(),
578 serde_json::Value::Null,
579 )),
580 ])
581 .map_err(|e| format!("Failed to add streaming commands to engine: {e}"))?;
582
583 let result = engine
585 .eval(nu_protocol::PipelineData::empty(), script)
586 .map_err(|e| format!("Script evaluation failed:\n{e}"))?;
587
588 match result {
590 nu_protocol::PipelineData::ByteStream(stream, ..) => {
591 if let Some(mut reader) = stream.reader() {
593 use std::io::Read;
594
595 let (tx, rx) = tokio::sync::mpsc::channel(16);
596
597 std::thread::spawn(move || {
599 let mut buffer = [0u8; 8192];
600 loop {
601 match reader.read(&mut buffer) {
602 Ok(0) => break, Ok(n) => {
604 let chunk = Bytes::copy_from_slice(&buffer[..n]);
605 if tx
606 .blocking_send(Ok(hyper::body::Frame::data(chunk)))
607 .is_err()
608 {
609 break;
610 }
611 }
612 Err(e) => {
613 let _ = tx.blocking_send(Err(
614 Box::new(e) as Box<dyn std::error::Error + Send + Sync>
615 ));
616 break;
617 }
618 }
619 }
620 });
621
622 let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
623 let body = StreamBody::new(stream).boxed();
624 Ok(Response::builder()
625 .status(StatusCode::OK)
626 .header("Content-Type", "application/octet-stream")
627 .body(body)?)
628 } else {
629 Ok(Response::builder()
631 .status(StatusCode::OK)
632 .header("Content-Type", "application/octet-stream")
633 .body(empty())?)
634 }
635 }
636 nu_protocol::PipelineData::ListStream(stream, ..) => {
637 let (tx, rx) = tokio::sync::mpsc::channel(16);
639
640 std::thread::spawn(move || {
642 for value in stream.into_iter() {
643 let json = nu::value_to_json(&value);
644 match serde_json::to_vec(&json) {
645 Ok(mut json_bytes) => {
646 json_bytes.push(b'\n'); let chunk = Bytes::from(json_bytes);
648 if tx
649 .blocking_send(Ok(hyper::body::Frame::data(chunk)))
650 .is_err()
651 {
652 break;
653 }
654 }
655 Err(e) => {
656 let _ = tx.blocking_send(Err(
657 Box::new(e) as Box<dyn std::error::Error + Send + Sync>
658 ));
659 break;
660 }
661 }
662 }
663 });
664
665 let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
666 let body = StreamBody::new(stream).boxed();
667 Ok(Response::builder()
668 .status(StatusCode::OK)
669 .header("Content-Type", "application/x-ndjson")
670 .body(body)?)
671 }
672 nu_protocol::PipelineData::Value(value, ..) => {
673 match &value {
674 nu_protocol::Value::String { .. }
675 | nu_protocol::Value::Int { .. }
676 | nu_protocol::Value::Float { .. }
677 | nu_protocol::Value::Bool { .. } => {
678 let text = match value {
680 nu_protocol::Value::String { val, .. } => val.clone(),
681 nu_protocol::Value::Int { val, .. } => val.to_string(),
682 nu_protocol::Value::Float { val, .. } => val.to_string(),
683 nu_protocol::Value::Bool { val, .. } => val.to_string(),
684 _ => value.into_string().unwrap_or_else(|_| "".to_string()),
685 };
686 Ok(Response::builder()
687 .status(StatusCode::OK)
688 .header("Content-Type", "text/plain")
689 .body(full(text))?)
690 }
691 _ => {
692 let json = nu::value_to_json(&value);
694 let json_string = serde_json::to_string(&json)
695 .map_err(|e| format!("Failed to serialize JSON: {e}"))?;
696 Ok(Response::builder()
697 .status(StatusCode::OK)
698 .header("Content-Type", "application/json")
699 .body(full(json_string))?)
700 }
701 }
702 }
703 nu_protocol::PipelineData::Empty => {
704 Ok(Response::builder()
706 .status(StatusCode::NO_CONTENT)
707 .body(empty())?)
708 }
709 }
710}
711
712#[cfg(test)]
713mod tests {
714 use super::*;
715
716 #[test]
717 fn test_match_route_last_follow() {
718 let headers = hyper::HeaderMap::new();
719
720 assert!(matches!(
721 match_route(&Method::GET, "/last/test", &headers, None),
722 Routes::LastGet { topic, follow: false } if topic == "test"
723 ));
724
725 assert!(matches!(
726 match_route(&Method::GET, "/last/test", &headers, Some("follow=true")),
727 Routes::LastGet { topic, follow: true } if topic == "test"
728 ));
729 }
730
731 #[tokio::test]
732 async fn test_handle_eval_logic() {
733 use crate::nu::Engine;
735 use crate::store::Store;
736 use nu_protocol::PipelineData;
737
738 let temp_dir = tempfile::tempdir().unwrap();
740 let store = Store::new(temp_dir.path().to_path_buf());
741
742 let mut engine = Engine::new().unwrap();
744
745 crate::nu::add_core_commands(&mut engine, &store).unwrap();
747
748 engine
750 .add_commands(vec![
751 Box::new(
752 crate::nu::commands::cat_stream_command::CatStreamCommand::new(store.clone()),
753 ),
754 Box::new(
755 crate::nu::commands::last_stream_command::LastStreamCommand::new(store.clone()),
756 ),
757 Box::new(crate::nu::commands::append_command::AppendCommand::new(
758 store.clone(),
759 serde_json::Value::Null,
760 )),
761 ])
762 .unwrap();
763
764 let result = engine
766 .eval(PipelineData::empty(), r#""hello world""#.to_string())
767 .unwrap();
768
769 match result {
770 PipelineData::Value(value, ..) => {
771 let text = value.into_string().unwrap();
772 assert_eq!(text, "hello world");
773 }
774 _ => panic!("Expected Value, got {:?}", result),
775 }
776
777 let result = engine
779 .eval(PipelineData::empty(), "2 + 3".to_string())
780 .unwrap();
781
782 match result {
783 PipelineData::Value(nu_protocol::Value::Int { val, .. }, ..) => {
784 assert_eq!(val, 5);
785 }
786 _ => panic!("Expected Int Value, got {:?}", result),
787 }
788 }
789}