1use std::collections::HashMap;
2use std::error::Error;
3use std::str::FromStr;
4
5use scru128::Scru128Id;
6
7use base64::Engine;
8
9use tokio::io::AsyncWriteExt;
10use tokio_stream::wrappers::ReceiverStream;
11use tokio_stream::StreamExt;
12use tokio_util::io::ReaderStream;
13
14use http_body_util::StreamBody;
15use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full};
16use hyper::body::Bytes;
17use hyper::header::ACCEPT;
18use hyper::server::conn::http1;
19use hyper::service::service_fn;
20use hyper::{Method, Request, Response, StatusCode};
21use hyper_util::rt::TokioIo;
22
23use crate::listener::Listener;
24use crate::nu;
25use crate::store::{self, FollowOption, Frame, ReadOptions, Store, TTL};
26
27type BoxError = Box<dyn std::error::Error + Send + Sync>;
28type HTTPResult = Result<Response<BoxBody<Bytes, BoxError>>, BoxError>;
29
30#[derive(Debug, PartialEq, Clone)]
31enum AcceptType {
32 Ndjson,
33 EventStream,
34}
35
36enum Routes {
37 StreamCat {
38 accept_type: AcceptType,
39 options: ReadOptions,
40 },
41 StreamAppend {
42 topic: String,
43 ttl: Option<TTL>,
44 context_id: Scru128Id,
45 },
46 HeadGet {
47 topic: String,
48 follow: bool,
49 context_id: Scru128Id,
50 },
51 StreamItemGet(Scru128Id),
52 StreamItemRemove(Scru128Id),
53 CasGet(ssri::Integrity),
54 CasPost,
55 Import,
56 Exec,
57 Version,
58 NotFound,
59 BadRequest(String),
60}
61
62fn validate_integrity(integrity: &ssri::Integrity) -> bool {
64 if integrity.hashes.is_empty() {
66 return false;
67 }
68
69 for hash in &integrity.hashes {
71 if base64::engine::general_purpose::STANDARD
73 .decode(&hash.digest)
74 .is_err()
75 {
76 return false;
77 }
78 }
79
80 true
81}
82
83fn match_route(
84 method: &Method,
85 path: &str,
86 headers: &hyper::HeaderMap,
87 query: Option<&str>,
88) -> Routes {
89 let params: HashMap<String, String> =
90 url::form_urlencoded::parse(query.unwrap_or("").as_bytes())
91 .into_owned()
92 .collect();
93
94 match (method, path) {
95 (&Method::GET, "/version") => Routes::Version,
96
97 (&Method::GET, "/") => {
98 let accept_type = match headers.get(ACCEPT) {
99 Some(accept) if accept == "text/event-stream" => AcceptType::EventStream,
100 _ => AcceptType::Ndjson,
101 };
102
103 let options = ReadOptions::from_query(query);
104
105 match options {
106 Ok(options) => Routes::StreamCat {
107 accept_type,
108 options,
109 },
110 Err(e) => Routes::BadRequest(e.to_string()),
111 }
112 }
113
114 (&Method::GET, p) if p.starts_with("/head/") => {
115 let topic = p.strip_prefix("/head/").unwrap().to_string();
116 let follow = params.contains_key("follow");
117 let context_id = match params.get("context") {
118 None => crate::store::ZERO_CONTEXT,
119 Some(ctx) => match ctx.parse() {
120 Ok(id) => id,
121 Err(e) => return Routes::BadRequest(format!("Invalid context ID: {e}")),
122 },
123 };
124 Routes::HeadGet {
125 topic,
126 follow,
127 context_id,
128 }
129 }
130
131 (&Method::GET, p) if p.starts_with("/cas/") => {
132 if let Some(hash) = p.strip_prefix("/cas/") {
133 match ssri::Integrity::from_str(hash) {
134 Ok(integrity) => {
135 if validate_integrity(&integrity) {
136 Routes::CasGet(integrity)
137 } else {
138 Routes::BadRequest(format!("Invalid CAS hash format: {hash}"))
139 }
140 }
141 Err(e) => Routes::BadRequest(format!("Invalid CAS hash: {e}")),
142 }
143 } else {
144 Routes::NotFound
145 }
146 }
147
148 (&Method::POST, "/cas") => Routes::CasPost,
149 (&Method::POST, "/import") => Routes::Import,
150 (&Method::POST, "/exec") => Routes::Exec,
151
152 (&Method::GET, p) => match Scru128Id::from_str(p.trim_start_matches('/')) {
153 Ok(id) => Routes::StreamItemGet(id),
154 Err(e) => Routes::BadRequest(format!("Invalid frame ID: {e}")),
155 },
156
157 (&Method::DELETE, p) => match Scru128Id::from_str(p.trim_start_matches('/')) {
158 Ok(id) => Routes::StreamItemRemove(id),
159 Err(e) => Routes::BadRequest(format!("Invalid frame ID: {e}")),
160 },
161
162 (&Method::POST, path) if path.starts_with('/') => {
163 let topic = path.trim_start_matches('/').to_string();
164 let context_id = match params.get("context") {
165 None => crate::store::ZERO_CONTEXT,
166 Some(ctx) => match ctx.parse() {
167 Ok(id) => id,
168 Err(e) => return Routes::BadRequest(format!("Invalid context ID: {e}")),
169 },
170 };
171
172 match TTL::from_query(query) {
173 Ok(ttl) => Routes::StreamAppend {
174 topic,
175 ttl: Some(ttl),
176 context_id,
177 },
178 Err(e) => Routes::BadRequest(e.to_string()),
179 }
180 }
181
182 _ => Routes::NotFound,
183 }
184}
185
186async fn handle(
187 mut store: Store,
188 _engine: nu::Engine, req: Request<hyper::body::Incoming>,
190) -> HTTPResult {
191 let method = req.method();
192 let path = req.uri().path();
193 let headers = req.headers().clone();
194 let query = req.uri().query();
195
196 let res = match match_route(method, path, &headers, query) {
197 Routes::Version => handle_version().await,
198
199 Routes::StreamCat {
200 accept_type,
201 options,
202 } => handle_stream_cat(&mut store, options, accept_type).await,
203
204 Routes::StreamAppend {
205 topic,
206 ttl,
207 context_id,
208 } => handle_stream_append(&mut store, req, topic, ttl, context_id).await,
209
210 Routes::CasGet(hash) => {
211 let reader = store.cas_reader(hash).await?;
212 let stream = ReaderStream::new(reader);
213
214 let stream = stream.map(|frame| {
215 let frame = frame.unwrap();
216 Ok(hyper::body::Frame::data(frame))
217 });
218
219 let body = StreamBody::new(stream).boxed();
220 Ok(Response::new(body))
221 }
222
223 Routes::CasPost => handle_cas_post(&mut store, req.into_body()).await,
224
225 Routes::StreamItemGet(id) => response_frame_or_404(store.get(&id)),
226
227 Routes::StreamItemRemove(id) => handle_stream_item_remove(&mut store, id).await,
228
229 Routes::HeadGet {
230 topic,
231 follow,
232 context_id,
233 } => handle_head_get(&store, &topic, follow, context_id).await,
234
235 Routes::Import => handle_import(&mut store, req.into_body()).await,
236
237 Routes::Exec => handle_exec(&store, req.into_body()).await,
238
239 Routes::NotFound => response_404(),
240 Routes::BadRequest(msg) => response_400(msg),
241 };
242
243 res.or_else(|e| response_500(e.to_string()))
244}
245
246async fn handle_stream_cat(
247 store: &mut Store,
248 options: ReadOptions,
249 accept_type: AcceptType,
250) -> HTTPResult {
251 let rx = store.read(options).await;
252 let stream = ReceiverStream::new(rx);
253
254 let accept_type_clone = accept_type.clone();
255 let stream = stream.map(move |frame| {
256 let bytes = match accept_type_clone {
257 AcceptType::Ndjson => {
258 let mut encoded = serde_json::to_vec(&frame).unwrap();
259 encoded.push(b'\n');
260 encoded
261 }
262 AcceptType::EventStream => format!(
263 "id: {id}\ndata: {data}\n\n",
264 id = frame.id,
265 data = serde_json::to_string(&frame).unwrap_or_default()
266 )
267 .into_bytes(),
268 };
269 Ok(hyper::body::Frame::data(Bytes::from(bytes)))
270 });
271
272 let body = StreamBody::new(stream).boxed();
273
274 let content_type = match accept_type {
275 AcceptType::Ndjson => "application/x-ndjson",
276 AcceptType::EventStream => "text/event-stream",
277 };
278
279 Ok(Response::builder()
280 .status(StatusCode::OK)
281 .header("Content-Type", content_type)
282 .body(body)?)
283}
284
285async fn handle_stream_append(
286 store: &mut Store,
287 req: Request<hyper::body::Incoming>,
288 topic: String,
289 ttl: Option<TTL>,
290 context_id: Scru128Id,
291) -> HTTPResult {
292 let (parts, mut body) = req.into_parts();
293
294 let hash = {
295 let mut writer = store.cas_writer().await?;
296 let mut bytes_written = 0;
297
298 while let Some(frame) = body.frame().await {
299 if let Ok(data) = frame?.into_data() {
300 writer.write_all(&data).await?;
301 bytes_written += data.len();
302 }
303 }
304
305 if bytes_written > 0 {
306 Some(writer.commit().await?)
307 } else {
308 None
309 }
310 };
311
312 let meta = match parts
313 .headers
314 .get("xs-meta")
315 .map(|x| x.to_str())
316 .transpose()
317 .unwrap()
318 .map(|s| {
319 base64::prelude::BASE64_STANDARD
321 .decode(s)
322 .map_err(|e| format!("xs-meta isn't valid Base64: {e}"))
323 .and_then(|decoded| {
324 String::from_utf8(decoded)
326 .map_err(|e| format!("xs-meta isn't valid UTF-8: {e}"))
327 .and_then(|json_str| {
328 serde_json::from_str(&json_str)
330 .map_err(|e| format!("xs-meta isn't valid JSON: {e}"))
331 })
332 })
333 })
334 .transpose()
335 {
336 Ok(meta) => meta,
337 Err(e) => return response_400(e.to_string()),
338 };
339
340 let frame = store.append(
341 Frame::builder(topic, context_id)
342 .maybe_hash(hash)
343 .maybe_meta(meta)
344 .maybe_ttl(ttl)
345 .build(),
346 )?;
347
348 Ok(Response::builder()
349 .status(StatusCode::OK)
350 .header("Content-Type", "application/json")
351 .body(full(serde_json::to_string(&frame).unwrap()))?)
352}
353
354async fn handle_cas_post(store: &mut Store, mut body: hyper::body::Incoming) -> HTTPResult {
355 let hash = {
356 let mut writer = store.cas_writer().await?;
357 let mut bytes_written = 0;
358
359 while let Some(frame) = body.frame().await {
360 if let Ok(data) = frame?.into_data() {
361 writer.write_all(&data).await?;
362 bytes_written += data.len();
363 }
364 }
365
366 if bytes_written == 0 {
367 return response_400("Empty body".to_string());
368 }
369
370 writer.commit().await?
371 };
372
373 Ok(Response::builder()
374 .status(StatusCode::OK)
375 .header("Content-Type", "text/plain")
376 .body(full(hash.to_string()))?)
377}
378
379async fn handle_version() -> HTTPResult {
380 let version = env!("CARGO_PKG_VERSION");
381 let version_info = serde_json::json!({ "version": version });
382 Ok(Response::builder()
383 .status(StatusCode::OK)
384 .header("Content-Type", "application/json")
385 .body(full(serde_json::to_string(&version_info).unwrap()))?)
386}
387
388pub async fn serve(
389 store: Store,
390 engine: nu::Engine,
391 expose: Option<String>,
392) -> Result<(), BoxError> {
393 let path = store.path.join("sock").to_string_lossy().to_string();
394 let listener = Listener::bind(&path).await?;
395
396 let mut listeners = vec![listener];
397 let mut expose_meta = None;
398
399 if let Some(expose) = expose {
400 let expose_listener = Listener::bind(&expose).await?;
401
402 if let Some(ticket) = expose_listener.get_ticket() {
404 expose_meta = Some(serde_json::json!({"expose": format!("iroh://{}", ticket)}));
405 } else {
406 expose_meta = Some(serde_json::json!({"expose": expose}));
407 }
408
409 listeners.push(expose_listener);
410 }
411
412 if let Err(e) = store.append(
413 Frame::builder("xs.start", store::ZERO_CONTEXT)
414 .maybe_meta(expose_meta)
415 .build(),
416 ) {
417 tracing::error!("Failed to append xs.start frame: {}", e);
418 }
419
420 let mut tasks = Vec::new();
421 for listener in listeners {
422 let store = store.clone();
423 let engine = engine.clone();
424 let task = tokio::spawn(async move { listener_loop(listener, store, engine).await });
425 tasks.push(task);
426 }
427
428 for task in tasks {
431 task.await??;
432 }
433
434 Ok(())
435}
436
437async fn listener_loop(
438 mut listener: Listener,
439 store: Store,
440 engine: nu::Engine,
441) -> Result<(), BoxError> {
442 loop {
443 let (stream, _) = listener.accept().await?;
444 let io = TokioIo::new(stream);
445 let store = store.clone();
446 let engine = engine.clone();
447 tokio::task::spawn(async move {
448 if let Err(err) = http1::Builder::new()
449 .serve_connection(
450 io,
451 service_fn(move |req| handle(store.clone(), engine.clone(), req)),
452 )
453 .await
454 {
455 if let Some(std::io::ErrorKind::NotConnected) = err.source().and_then(|source| {
457 source
458 .downcast_ref::<std::io::Error>()
459 .map(|io_err| io_err.kind())
460 }) {
461 } else {
463 tracing::error!("TBD: {:?}", err);
465 }
466 }
467 });
468 }
469}
470
471fn response_frame_or_404(frame: Option<store::Frame>) -> HTTPResult {
472 if let Some(frame) = frame {
473 Ok(Response::builder()
474 .status(StatusCode::OK)
475 .header("Content-Type", "application/json")
476 .body(full(serde_json::to_string(&frame).unwrap()))?)
477 } else {
478 response_404()
479 }
480}
481
482async fn handle_stream_item_remove(store: &mut Store, id: Scru128Id) -> HTTPResult {
483 match store.remove(&id) {
484 Ok(()) => Ok(Response::builder()
485 .status(StatusCode::NO_CONTENT)
486 .body(empty())?),
487 Err(e) => {
488 tracing::error!("Failed to remove item {}: {:?}", id, e);
489
490 Ok(Response::builder()
491 .status(StatusCode::INTERNAL_SERVER_ERROR)
492 .body(full("internal-error"))?)
493 }
494 }
495}
496
497async fn handle_head_get(
498 store: &Store,
499 topic: &str,
500 follow: bool,
501 context_id: Scru128Id,
502) -> HTTPResult {
503 let current_head = store.head(topic, context_id);
504
505 if !follow {
506 return response_frame_or_404(current_head);
507 }
508
509 let rx = store
510 .read(
511 ReadOptions::builder()
512 .follow(FollowOption::On)
513 .tail(true)
514 .maybe_last_id(current_head.as_ref().map(|f| f.id))
515 .build(),
516 )
517 .await;
518
519 let topic = topic.to_string();
520 let stream = tokio_stream::wrappers::ReceiverStream::new(rx)
521 .filter(move |frame| frame.topic == topic)
522 .map(|frame| {
523 let mut bytes = serde_json::to_vec(&frame).unwrap();
524 bytes.push(b'\n');
525 Ok::<_, BoxError>(hyper::body::Frame::data(Bytes::from(bytes)))
526 });
527
528 let body = if let Some(frame) = current_head {
529 let mut head_bytes = serde_json::to_vec(&frame).unwrap();
530 head_bytes.push(b'\n');
531 let head_chunk = Ok(hyper::body::Frame::data(Bytes::from(head_bytes)));
532 StreamBody::new(futures::stream::once(async { head_chunk }).chain(stream)).boxed()
533 } else {
534 StreamBody::new(stream).boxed()
535 };
536
537 Ok(Response::builder()
538 .status(StatusCode::OK)
539 .header("Content-Type", "application/x-ndjson")
540 .body(body)?)
541}
542
543async fn handle_import(store: &mut Store, body: hyper::body::Incoming) -> HTTPResult {
544 let bytes = body.collect().await?.to_bytes();
545 let frame: Frame = match serde_json::from_slice(&bytes) {
546 Ok(frame) => frame,
547 Err(e) => return response_400(format!("Invalid frame JSON: {e}")),
548 };
549
550 store.insert_frame(&frame)?;
551
552 Ok(Response::builder()
553 .status(StatusCode::OK)
554 .header("Content-Type", "application/json")
555 .body(full(serde_json::to_string(&frame).unwrap()))?)
556}
557
558fn response_404() -> HTTPResult {
559 Ok(Response::builder()
560 .status(StatusCode::NOT_FOUND)
561 .body(empty())?)
562}
563
564fn response_400(message: String) -> HTTPResult {
565 let body = full(message);
566 Ok(Response::builder()
567 .status(StatusCode::BAD_REQUEST)
568 .body(body)?)
569}
570
571fn response_500(message: String) -> HTTPResult {
572 let body = full(message);
573 Ok(Response::builder()
574 .status(StatusCode::INTERNAL_SERVER_ERROR)
575 .body(body)?)
576}
577
578fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, BoxError> {
579 Full::new(chunk.into())
580 .map_err(|never| match never {})
581 .boxed()
582}
583
584fn empty() -> BoxBody<Bytes, BoxError> {
585 Empty::<Bytes>::new()
586 .map_err(|never| match never {})
587 .boxed()
588}
589
590async fn handle_exec(store: &Store, body: hyper::body::Incoming) -> HTTPResult {
591 let bytes = body.collect().await?.to_bytes();
593 let script =
594 String::from_utf8(bytes.to_vec()).map_err(|e| format!("Invalid UTF-8 in script: {e}"))?;
595
596 let mut engine =
598 nu::Engine::new().map_err(|e| format!("Failed to create nushell engine: {e}"))?;
599
600 let context_id = crate::store::ZERO_CONTEXT;
602
603 nu::add_core_commands(&mut engine, store)
605 .map_err(|e| format!("Failed to add core commands to engine: {e}"))?;
606
607 engine
609 .add_commands(vec![
610 Box::new(nu::commands::cat_stream_command::CatStreamCommand::new(
611 store.clone(),
612 context_id,
613 )),
614 Box::new(nu::commands::head_stream_command::HeadStreamCommand::new(
615 store.clone(),
616 context_id,
617 )),
618 Box::new(nu::commands::append_command::AppendCommand::new(
619 store.clone(),
620 context_id,
621 serde_json::Value::Null,
622 )),
623 ])
624 .map_err(|e| format!("Failed to add context commands to engine: {e}"))?;
625
626 let result = engine
628 .eval(nu_protocol::PipelineData::empty(), script)
629 .map_err(|e| format!("Script execution failed:\n{e}"))?;
630
631 match result {
633 nu_protocol::PipelineData::ByteStream(stream, ..) => {
634 if let Some(mut reader) = stream.reader() {
636 use std::io::Read;
637
638 let (tx, rx) = tokio::sync::mpsc::channel(16);
639
640 std::thread::spawn(move || {
642 let mut buffer = [0u8; 8192];
643 loop {
644 match reader.read(&mut buffer) {
645 Ok(0) => break, Ok(n) => {
647 let chunk = Bytes::copy_from_slice(&buffer[..n]);
648 if tx
649 .blocking_send(Ok(hyper::body::Frame::data(chunk)))
650 .is_err()
651 {
652 break;
653 }
654 }
655 Err(e) => {
656 let _ = tx.blocking_send(Err(
657 Box::new(e) as Box<dyn std::error::Error + Send + Sync>
658 ));
659 break;
660 }
661 }
662 }
663 });
664
665 let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
666 let body = StreamBody::new(stream).boxed();
667 Ok(Response::builder()
668 .status(StatusCode::OK)
669 .header("Content-Type", "application/octet-stream")
670 .body(body)?)
671 } else {
672 Ok(Response::builder()
674 .status(StatusCode::OK)
675 .header("Content-Type", "application/octet-stream")
676 .body(empty())?)
677 }
678 }
679 nu_protocol::PipelineData::ListStream(stream, ..) => {
680 let (tx, rx) = tokio::sync::mpsc::channel(16);
682
683 std::thread::spawn(move || {
685 for value in stream.into_iter() {
686 let json = nu::value_to_json(&value);
687 match serde_json::to_vec(&json) {
688 Ok(mut json_bytes) => {
689 json_bytes.push(b'\n'); let chunk = Bytes::from(json_bytes);
691 if tx
692 .blocking_send(Ok(hyper::body::Frame::data(chunk)))
693 .is_err()
694 {
695 break;
696 }
697 }
698 Err(e) => {
699 let _ = tx.blocking_send(Err(
700 Box::new(e) as Box<dyn std::error::Error + Send + Sync>
701 ));
702 break;
703 }
704 }
705 }
706 });
707
708 let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
709 let body = StreamBody::new(stream).boxed();
710 Ok(Response::builder()
711 .status(StatusCode::OK)
712 .header("Content-Type", "application/x-ndjson")
713 .body(body)?)
714 }
715 nu_protocol::PipelineData::Value(value, ..) => {
716 match &value {
717 nu_protocol::Value::String { .. }
718 | nu_protocol::Value::Int { .. }
719 | nu_protocol::Value::Float { .. }
720 | nu_protocol::Value::Bool { .. } => {
721 let text = match value {
723 nu_protocol::Value::String { val, .. } => val.clone(),
724 nu_protocol::Value::Int { val, .. } => val.to_string(),
725 nu_protocol::Value::Float { val, .. } => val.to_string(),
726 nu_protocol::Value::Bool { val, .. } => val.to_string(),
727 _ => value.into_string().unwrap_or_else(|_| "".to_string()),
728 };
729 Ok(Response::builder()
730 .status(StatusCode::OK)
731 .header("Content-Type", "text/plain")
732 .body(full(text))?)
733 }
734 _ => {
735 let json = nu::value_to_json(&value);
737 let json_string = serde_json::to_string(&json)
738 .map_err(|e| format!("Failed to serialize JSON: {e}"))?;
739 Ok(Response::builder()
740 .status(StatusCode::OK)
741 .header("Content-Type", "application/json")
742 .body(full(json_string))?)
743 }
744 }
745 }
746 nu_protocol::PipelineData::Empty => {
747 Ok(Response::builder()
749 .status(StatusCode::NO_CONTENT)
750 .body(empty())?)
751 }
752 }
753}
754
755#[cfg(test)]
756mod tests {
757 use super::*;
758
759 #[test]
760 fn test_match_route_head_follow() {
761 let headers = hyper::HeaderMap::new();
762
763 assert!(matches!(
764 match_route(&Method::GET, "/head/test", &headers, None),
765 Routes::HeadGet { topic, follow: false, context_id: _ } if topic == "test"
766 ));
767
768 assert!(matches!(
769 match_route(&Method::GET, "/head/test", &headers, Some("follow=true")),
770 Routes::HeadGet { topic, follow: true, context_id: _ } if topic == "test"
771 ));
772 }
773
774 #[tokio::test]
775 async fn test_handle_exec_logic() {
776 use crate::nu::Engine;
778 use crate::store::Store;
779 use nu_protocol::PipelineData;
780
781 let temp_dir = tempfile::tempdir().unwrap();
783 let store = Store::new(temp_dir.path().to_path_buf());
784
785 let mut engine = Engine::new().unwrap();
787
788 let context_id = crate::store::ZERO_CONTEXT;
790
791 crate::nu::add_core_commands(&mut engine, &store).unwrap();
793
794 engine
796 .add_commands(vec![
797 Box::new(
798 crate::nu::commands::cat_stream_command::CatStreamCommand::new(
799 store.clone(),
800 context_id,
801 ),
802 ),
803 Box::new(
804 crate::nu::commands::head_stream_command::HeadStreamCommand::new(
805 store.clone(),
806 context_id,
807 ),
808 ),
809 Box::new(crate::nu::commands::append_command::AppendCommand::new(
810 store.clone(),
811 context_id,
812 serde_json::Value::Null,
813 )),
814 ])
815 .unwrap();
816
817 let result = engine
819 .eval(PipelineData::empty(), r#""hello world""#.to_string())
820 .unwrap();
821
822 match result {
823 PipelineData::Value(value, ..) => {
824 let text = value.into_string().unwrap();
825 assert_eq!(text, "hello world");
826 }
827 _ => panic!("Expected Value, got {:?}", result),
828 }
829
830 let result = engine
832 .eval(PipelineData::empty(), "2 + 3".to_string())
833 .unwrap();
834
835 match result {
836 PipelineData::Value(nu_protocol::Value::Int { val, .. }, ..) => {
837 assert_eq!(val, 5);
838 }
839 _ => panic!("Expected Int Value, got {:?}", result),
840 }
841 }
842}