1use hashtree_core::Hash;
20use serde::{Deserialize, Serialize};
21
22fn default_htl() -> u8 {
23 crate::types::MAX_HTL
24}
25
26fn is_max_htl(htl: &u8) -> bool {
27 *htl == crate::types::MAX_HTL
28}
29
30pub const MSG_TYPE_REQUEST: u8 = 0x00;
32pub const MSG_TYPE_RESPONSE: u8 = 0x01;
33pub const MSG_TYPE_QUOTE_REQUEST: u8 = 0x02;
34pub const MSG_TYPE_QUOTE_RESPONSE: u8 = 0x03;
35pub const MSG_TYPE_PAYMENT: u8 = 0x04;
36pub const MSG_TYPE_PAYMENT_ACK: u8 = 0x05;
37pub const MSG_TYPE_CHUNK: u8 = 0x06;
38pub const MSG_TYPE_PEER_HINTS: u8 = 0x07;
39pub const MSG_TYPE_PUBSUB_INTEREST: u8 = 0x08;
40pub const MSG_TYPE_PUBSUB_FRAME: u8 = 0x09;
41pub const MSG_TYPE_PUBSUB_INVENTORY: u8 = 0x0a;
42pub const MSG_TYPE_PUBSUB_WANT: u8 = 0x0b;
43
44pub const FRAGMENT_SIZE: usize = 32 * 1024;
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct DataRequest {
50 #[serde(with = "serde_bytes")]
52 pub h: Vec<u8>,
53 #[serde(default = "default_htl", skip_serializing_if = "is_max_htl")]
55 pub htl: u8,
56 #[serde(skip_serializing_if = "Option::is_none")]
58 pub q: Option<u64>,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct DataResponse {
64 #[serde(with = "serde_bytes")]
66 pub h: Vec<u8>,
67 #[serde(with = "serde_bytes")]
69 pub d: Vec<u8>,
70 #[serde(skip_serializing_if = "Option::is_none")]
72 pub i: Option<u32>,
73 #[serde(skip_serializing_if = "Option::is_none")]
75 pub n: Option<u32>,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct DataQuoteRequest {
81 #[serde(with = "serde_bytes")]
83 pub h: Vec<u8>,
84 pub p: u64,
86 pub t: u32,
88 #[serde(skip_serializing_if = "Option::is_none")]
90 pub m: Option<String>,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct DataQuoteResponse {
96 #[serde(with = "serde_bytes")]
98 pub h: Vec<u8>,
99 pub a: bool,
101 #[serde(skip_serializing_if = "Option::is_none")]
103 pub q: Option<u64>,
104 #[serde(skip_serializing_if = "Option::is_none")]
106 pub p: Option<u64>,
107 #[serde(skip_serializing_if = "Option::is_none")]
109 pub t: Option<u32>,
110 #[serde(skip_serializing_if = "Option::is_none")]
112 pub m: Option<String>,
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct DataPayment {
118 #[serde(with = "serde_bytes")]
119 pub h: Vec<u8>,
120 pub q: u64,
121 pub c: u32,
122 pub p: u64,
123 #[serde(skip_serializing_if = "Option::is_none")]
124 pub m: Option<String>,
125 pub tok: String,
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct DataPaymentAck {
131 #[serde(with = "serde_bytes")]
132 pub h: Vec<u8>,
133 pub q: u64,
134 pub c: u32,
135 pub a: bool,
136 #[serde(skip_serializing_if = "Option::is_none")]
137 pub e: Option<String>,
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct DataChunk {
143 #[serde(with = "serde_bytes")]
144 pub h: Vec<u8>,
145 pub q: u64,
146 pub c: u32,
147 pub n: u32,
148 pub p: u64,
149 #[serde(with = "serde_bytes")]
150 pub d: Vec<u8>,
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct PeerHints {
156 #[serde(default, rename = "u")]
158 pub signal_urls: Vec<String>,
159}
160
161#[derive(Debug, Clone, Serialize, Deserialize)]
163pub struct PubsubInterest {
164 #[serde(rename = "s")]
166 pub stream_id: String,
167 #[serde(rename = "sub")]
169 pub subscriber_peer_id: String,
170 #[serde(rename = "q")]
172 pub seq: u64,
173 #[serde(rename = "a")]
175 pub active: bool,
176 #[serde(default = "default_htl", skip_serializing_if = "is_max_htl")]
178 pub htl: u8,
179}
180
181#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct PubsubFrame {
184 #[serde(rename = "s")]
186 pub stream_id: String,
187 #[serde(rename = "q")]
189 pub seq: u64,
190 #[serde(rename = "o")]
192 pub origin_peer_id: String,
193 #[serde(default = "default_htl", skip_serializing_if = "is_max_htl")]
195 pub htl: u8,
196 #[serde(with = "serde_bytes", rename = "d")]
198 pub payload: Vec<u8>,
199}
200
201#[derive(Debug, Clone, Serialize, Deserialize)]
203pub struct PubsubInventory {
204 #[serde(rename = "s")]
206 pub stream_id: String,
207 #[serde(rename = "q")]
209 pub seq: u64,
210 #[serde(rename = "o")]
212 pub origin_peer_id: String,
213 #[serde(rename = "b")]
215 pub payload_bytes: u64,
216 #[serde(default = "default_htl", skip_serializing_if = "is_max_htl")]
218 pub htl: u8,
219}
220
221#[derive(Debug, Clone, Serialize, Deserialize)]
223pub struct PubsubWant {
224 #[serde(rename = "s")]
226 pub stream_id: String,
227 #[serde(rename = "q")]
229 pub seq: u64,
230 #[serde(rename = "o")]
232 pub origin_peer_id: String,
233}
234
235#[derive(Debug, Clone)]
237pub enum DataMessage {
238 Request(DataRequest),
239 Response(DataResponse),
240 QuoteRequest(DataQuoteRequest),
241 QuoteResponse(DataQuoteResponse),
242 Payment(DataPayment),
243 PaymentAck(DataPaymentAck),
244 Chunk(DataChunk),
245 PeerHints(PeerHints),
246 PubsubInterest(PubsubInterest),
247 PubsubFrame(PubsubFrame),
248 PubsubInventory(PubsubInventory),
249 PubsubWant(PubsubWant),
250}
251
252pub fn encode_request(req: &DataRequest) -> Vec<u8> {
255 let body = rmp_serde::to_vec_named(req).expect("Failed to encode request");
256 let mut result = Vec::with_capacity(1 + body.len());
257 result.push(MSG_TYPE_REQUEST);
258 result.extend(body);
259 result
260}
261
262pub fn encode_response(res: &DataResponse) -> Vec<u8> {
265 let body = rmp_serde::to_vec_named(res).expect("Failed to encode response");
266 let mut result = Vec::with_capacity(1 + body.len());
267 result.push(MSG_TYPE_RESPONSE);
268 result.extend(body);
269 result
270}
271
272pub fn encode_quote_request(req: &DataQuoteRequest) -> Vec<u8> {
274 let body = rmp_serde::to_vec_named(req).expect("Failed to encode quote request");
275 let mut result = Vec::with_capacity(1 + body.len());
276 result.push(MSG_TYPE_QUOTE_REQUEST);
277 result.extend(body);
278 result
279}
280
281pub fn encode_quote_response(res: &DataQuoteResponse) -> Vec<u8> {
283 let body = rmp_serde::to_vec_named(res).expect("Failed to encode quote response");
284 let mut result = Vec::with_capacity(1 + body.len());
285 result.push(MSG_TYPE_QUOTE_RESPONSE);
286 result.extend(body);
287 result
288}
289
290pub fn encode_payment(req: &DataPayment) -> Vec<u8> {
292 let body = rmp_serde::to_vec_named(req).expect("Failed to encode payment");
293 let mut result = Vec::with_capacity(1 + body.len());
294 result.push(MSG_TYPE_PAYMENT);
295 result.extend(body);
296 result
297}
298
299pub fn encode_payment_ack(res: &DataPaymentAck) -> Vec<u8> {
301 let body = rmp_serde::to_vec_named(res).expect("Failed to encode payment ack");
302 let mut result = Vec::with_capacity(1 + body.len());
303 result.push(MSG_TYPE_PAYMENT_ACK);
304 result.extend(body);
305 result
306}
307
308pub fn encode_chunk(chunk: &DataChunk) -> Vec<u8> {
310 let body = rmp_serde::to_vec_named(chunk).expect("Failed to encode chunk");
311 let mut result = Vec::with_capacity(1 + body.len());
312 result.push(MSG_TYPE_CHUNK);
313 result.extend(body);
314 result
315}
316
317pub fn encode_peer_hints(hints: &PeerHints) -> Vec<u8> {
319 let body = rmp_serde::to_vec_named(hints).expect("Failed to encode peer hints");
320 let mut result = Vec::with_capacity(1 + body.len());
321 result.push(MSG_TYPE_PEER_HINTS);
322 result.extend(body);
323 result
324}
325
326pub fn encode_pubsub_interest(interest: &PubsubInterest) -> Vec<u8> {
328 let body = rmp_serde::to_vec_named(interest).expect("Failed to encode pubsub interest");
329 let mut result = Vec::with_capacity(1 + body.len());
330 result.push(MSG_TYPE_PUBSUB_INTEREST);
331 result.extend(body);
332 result
333}
334
335pub fn encode_pubsub_frame(frame: &PubsubFrame) -> Vec<u8> {
337 let body = rmp_serde::to_vec_named(frame).expect("Failed to encode pubsub frame");
338 let mut result = Vec::with_capacity(1 + body.len());
339 result.push(MSG_TYPE_PUBSUB_FRAME);
340 result.extend(body);
341 result
342}
343
344pub fn encode_pubsub_inventory(inv: &PubsubInventory) -> Vec<u8> {
346 let body = rmp_serde::to_vec_named(inv).expect("Failed to encode pubsub inventory");
347 let mut result = Vec::with_capacity(1 + body.len());
348 result.push(MSG_TYPE_PUBSUB_INVENTORY);
349 result.extend(body);
350 result
351}
352
353pub fn encode_pubsub_want(want: &PubsubWant) -> Vec<u8> {
355 let body = rmp_serde::to_vec_named(want).expect("Failed to encode pubsub want");
356 let mut result = Vec::with_capacity(1 + body.len());
357 result.push(MSG_TYPE_PUBSUB_WANT);
358 result.extend(body);
359 result
360}
361
362pub fn parse_message(data: &[u8]) -> Option<DataMessage> {
364 if data.len() < 2 {
365 return None;
366 }
367
368 let msg_type = data[0];
369 let body = &data[1..];
370
371 match msg_type {
372 MSG_TYPE_REQUEST => rmp_serde::from_slice::<DataRequest>(body)
373 .ok()
374 .map(DataMessage::Request),
375 MSG_TYPE_RESPONSE => rmp_serde::from_slice::<DataResponse>(body)
376 .ok()
377 .map(DataMessage::Response),
378 MSG_TYPE_QUOTE_REQUEST => rmp_serde::from_slice::<DataQuoteRequest>(body)
379 .ok()
380 .map(DataMessage::QuoteRequest),
381 MSG_TYPE_QUOTE_RESPONSE => rmp_serde::from_slice::<DataQuoteResponse>(body)
382 .ok()
383 .map(DataMessage::QuoteResponse),
384 MSG_TYPE_PAYMENT => rmp_serde::from_slice::<DataPayment>(body)
385 .ok()
386 .map(DataMessage::Payment),
387 MSG_TYPE_PAYMENT_ACK => rmp_serde::from_slice::<DataPaymentAck>(body)
388 .ok()
389 .map(DataMessage::PaymentAck),
390 MSG_TYPE_CHUNK => rmp_serde::from_slice::<DataChunk>(body)
391 .ok()
392 .map(DataMessage::Chunk),
393 MSG_TYPE_PEER_HINTS => rmp_serde::from_slice::<PeerHints>(body)
394 .ok()
395 .map(DataMessage::PeerHints),
396 MSG_TYPE_PUBSUB_INTEREST => rmp_serde::from_slice::<PubsubInterest>(body)
397 .ok()
398 .map(DataMessage::PubsubInterest),
399 MSG_TYPE_PUBSUB_FRAME => rmp_serde::from_slice::<PubsubFrame>(body)
400 .ok()
401 .map(DataMessage::PubsubFrame),
402 MSG_TYPE_PUBSUB_INVENTORY => rmp_serde::from_slice::<PubsubInventory>(body)
403 .ok()
404 .map(DataMessage::PubsubInventory),
405 MSG_TYPE_PUBSUB_WANT => rmp_serde::from_slice::<PubsubWant>(body)
406 .ok()
407 .map(DataMessage::PubsubWant),
408 _ => None,
409 }
410}
411
412pub fn create_request(hash: &Hash, htl: u8) -> DataRequest {
414 DataRequest {
415 h: hash.to_vec(),
416 htl,
417 q: None,
418 }
419}
420
421pub fn create_request_with_quote(hash: &Hash, htl: u8, quote_id: u64) -> DataRequest {
423 DataRequest {
424 h: hash.to_vec(),
425 htl,
426 q: Some(quote_id),
427 }
428}
429
430pub fn create_response(hash: &Hash, data: Vec<u8>) -> DataResponse {
432 DataResponse {
433 h: hash.to_vec(),
434 d: data,
435 i: None,
436 n: None,
437 }
438}
439
440pub fn create_pubsub_interest(
442 stream_id: impl Into<String>,
443 subscriber_peer_id: impl Into<String>,
444 seq: u64,
445 active: bool,
446 htl: u8,
447) -> PubsubInterest {
448 PubsubInterest {
449 stream_id: stream_id.into(),
450 subscriber_peer_id: subscriber_peer_id.into(),
451 seq,
452 active,
453 htl,
454 }
455}
456
457pub fn create_pubsub_frame(
459 stream_id: impl Into<String>,
460 seq: u64,
461 origin_peer_id: impl Into<String>,
462 payload: Vec<u8>,
463 htl: u8,
464) -> PubsubFrame {
465 PubsubFrame {
466 stream_id: stream_id.into(),
467 seq,
468 origin_peer_id: origin_peer_id.into(),
469 payload,
470 htl,
471 }
472}
473
474pub fn create_pubsub_inventory(
476 stream_id: impl Into<String>,
477 seq: u64,
478 origin_peer_id: impl Into<String>,
479 payload_bytes: u64,
480 htl: u8,
481) -> PubsubInventory {
482 PubsubInventory {
483 stream_id: stream_id.into(),
484 seq,
485 origin_peer_id: origin_peer_id.into(),
486 payload_bytes,
487 htl,
488 }
489}
490
491pub fn create_pubsub_want(
493 stream_id: impl Into<String>,
494 seq: u64,
495 origin_peer_id: impl Into<String>,
496) -> PubsubWant {
497 PubsubWant {
498 stream_id: stream_id.into(),
499 seq,
500 origin_peer_id: origin_peer_id.into(),
501 }
502}
503
504pub fn create_quote_request(
506 hash: &Hash,
507 ttl_ms: u32,
508 payment_sat: u64,
509 mint_url: Option<&str>,
510) -> DataQuoteRequest {
511 DataQuoteRequest {
512 h: hash.to_vec(),
513 p: payment_sat,
514 t: ttl_ms,
515 m: mint_url.map(str::to_string),
516 }
517}
518
519pub fn create_quote_response_available(
521 hash: &Hash,
522 quote_id: u64,
523 payment_sat: u64,
524 ttl_ms: u32,
525 mint_url: Option<&str>,
526) -> DataQuoteResponse {
527 DataQuoteResponse {
528 h: hash.to_vec(),
529 a: true,
530 q: Some(quote_id),
531 p: Some(payment_sat),
532 t: Some(ttl_ms),
533 m: mint_url.map(str::to_string),
534 }
535}
536
537pub fn create_quote_response_unavailable(hash: &Hash) -> DataQuoteResponse {
539 DataQuoteResponse {
540 h: hash.to_vec(),
541 a: false,
542 q: None,
543 p: None,
544 t: None,
545 m: None,
546 }
547}
548
549pub fn create_fragment_response(
551 hash: &Hash,
552 data: Vec<u8>,
553 index: u32,
554 total: u32,
555) -> DataResponse {
556 DataResponse {
557 h: hash.to_vec(),
558 d: data,
559 i: Some(index),
560 n: Some(total),
561 }
562}
563
564pub fn is_fragmented(res: &DataResponse) -> bool {
566 res.i.is_some() && res.n.is_some()
567}
568
569pub fn hash_to_key(hash: &[u8]) -> String {
571 hex::encode(hash)
572}
573
574pub fn hash_to_bytes(hash: &Hash) -> Vec<u8> {
576 hash.to_vec()
577}
578
579pub fn bytes_to_hash(bytes: &[u8]) -> Option<Hash> {
581 if bytes.len() == 32 {
582 let mut hash = [0u8; 32];
583 hash.copy_from_slice(bytes);
584 Some(hash)
585 } else {
586 None
587 }
588}
589
590#[cfg(test)]
591mod tests {
592 use super::*;
593
594 #[test]
595 fn test_encode_decode_request() {
596 let hash = [0xab; 32];
597 let req = create_request(&hash, 10);
598 let encoded = encode_request(&req);
599
600 assert_eq!(encoded[0], MSG_TYPE_REQUEST);
601
602 let parsed = parse_message(&encoded).unwrap();
603 match parsed {
604 DataMessage::Request(r) => {
605 assert_eq!(r.h, hash.to_vec());
606 assert_eq!(r.htl, 10);
607 }
608 _ => panic!("Expected request"),
609 }
610 }
611
612 #[test]
613 fn test_decode_request_without_explicit_htl_defaults_to_max() {
614 #[derive(Serialize)]
615 struct LegacyRequest {
616 #[serde(with = "serde_bytes")]
617 h: Vec<u8>,
618 }
619
620 let hash = [0x21; 32];
621 let body = rmp_serde::to_vec_named(&LegacyRequest { h: hash.to_vec() }).unwrap();
622 let mut encoded = vec![MSG_TYPE_REQUEST];
623 encoded.extend(body);
624
625 let parsed = parse_message(&encoded).unwrap();
626 match parsed {
627 DataMessage::Request(r) => {
628 assert_eq!(r.h, hash.to_vec());
629 assert_eq!(r.htl, crate::types::MAX_HTL);
630 }
631 _ => panic!("Expected request"),
632 }
633 }
634
635 #[test]
636 fn test_encode_decode_response() {
637 let hash = [0xcd; 32];
638 let data = vec![1, 2, 3, 4, 5];
639 let res = create_response(&hash, data.clone());
640 let encoded = encode_response(&res);
641
642 assert_eq!(encoded[0], MSG_TYPE_RESPONSE);
643
644 let parsed = parse_message(&encoded).unwrap();
645 match parsed {
646 DataMessage::Response(r) => {
647 assert_eq!(r.h, hash.to_vec());
648 assert_eq!(r.d, data);
649 assert!(!is_fragmented(&r));
650 }
651 _ => panic!("Expected response"),
652 }
653 }
654
655 #[test]
656 fn test_encode_decode_fragment_response() {
657 let hash = [0xef; 32];
658 let data = vec![10, 20, 30];
659 let res = create_fragment_response(&hash, data.clone(), 2, 5);
660 let encoded = encode_response(&res);
661
662 let parsed = parse_message(&encoded).unwrap();
663 match parsed {
664 DataMessage::Response(r) => {
665 assert_eq!(r.h, hash.to_vec());
666 assert_eq!(r.d, data);
667 assert!(is_fragmented(&r));
668 assert_eq!(r.i, Some(2));
669 assert_eq!(r.n, Some(5));
670 }
671 _ => panic!("Expected response"),
672 }
673 }
674
675 #[test]
676 fn test_encode_decode_quote_request() {
677 let hash = [0x44; 32];
678 let req = create_quote_request(&hash, 7, 2_500, Some("https://mint.example"));
679 let encoded = encode_quote_request(&req);
680
681 assert_eq!(encoded[0], MSG_TYPE_QUOTE_REQUEST);
682
683 let parsed = parse_message(&encoded).unwrap();
684 match parsed {
685 DataMessage::QuoteRequest(r) => {
686 assert_eq!(r.h, hash.to_vec());
687 assert_eq!(r.t, 7);
688 assert_eq!(r.p, 2_500);
689 assert_eq!(r.m.as_deref(), Some("https://mint.example"));
690 }
691 _ => panic!("Expected quote request"),
692 }
693 }
694
695 #[test]
696 fn test_encode_decode_quote_response_and_quoted_request() {
697 let hash = [0x55; 32];
698 let quote =
699 create_quote_response_available(&hash, 19, 2_500, 7, Some("https://mint.example"));
700 let encoded_quote = encode_quote_response("e);
701
702 assert_eq!(encoded_quote[0], MSG_TYPE_QUOTE_RESPONSE);
703
704 let parsed_quote = parse_message(&encoded_quote).unwrap();
705 match parsed_quote {
706 DataMessage::QuoteResponse(r) => {
707 assert_eq!(r.h, hash.to_vec());
708 assert!(r.a);
709 assert_eq!(r.q, Some(19));
710 assert_eq!(r.p, Some(2_500));
711 assert_eq!(r.t, Some(7));
712 assert_eq!(r.m.as_deref(), Some("https://mint.example"));
713 }
714 _ => panic!("Expected quote response"),
715 }
716
717 let req = create_request_with_quote(&hash, 9, 19);
718 let encoded_req = encode_request(&req);
719 let parsed_req = parse_message(&encoded_req).unwrap();
720 match parsed_req {
721 DataMessage::Request(r) => {
722 assert_eq!(r.h, hash.to_vec());
723 assert_eq!(r.htl, 9);
724 assert_eq!(r.q, Some(19));
725 }
726 _ => panic!("Expected quoted request"),
727 }
728 }
729
730 #[test]
731 fn test_hash_conversions() {
732 let hash = [0x12; 32];
733 let bytes = hash_to_bytes(&hash);
734 let back = bytes_to_hash(&bytes).unwrap();
735 assert_eq!(hash, back);
736 }
737
738 #[test]
739 fn test_encode_decode_payment_ack_and_chunk() {
740 let payment = DataPayment {
741 h: vec![0x61; 32],
742 q: 9,
743 c: 1,
744 p: 3,
745 m: Some("https://mint.example".to_string()),
746 tok: "cashuBtoken".to_string(),
747 };
748 let payment_ack = DataPaymentAck {
749 h: vec![0x62; 32],
750 q: 9,
751 c: 1,
752 a: true,
753 e: None,
754 };
755 let chunk = DataChunk {
756 h: vec![0x63; 32],
757 q: 9,
758 c: 1,
759 n: 2,
760 p: 3,
761 d: vec![1, 2, 3],
762 };
763
764 match parse_message(&encode_payment(&payment)).unwrap() {
765 DataMessage::Payment(parsed) => {
766 assert_eq!(parsed.q, payment.q);
767 assert_eq!(parsed.p, payment.p);
768 assert_eq!(parsed.m, payment.m);
769 }
770 _ => panic!("Expected payment"),
771 }
772
773 match parse_message(&encode_payment_ack(&payment_ack)).unwrap() {
774 DataMessage::PaymentAck(parsed) => {
775 assert_eq!(parsed.q, payment_ack.q);
776 assert_eq!(parsed.c, payment_ack.c);
777 assert!(parsed.a);
778 }
779 _ => panic!("Expected payment ack"),
780 }
781
782 match parse_message(&encode_chunk(&chunk)).unwrap() {
783 DataMessage::Chunk(parsed) => {
784 assert_eq!(parsed.q, chunk.q);
785 assert_eq!(parsed.n, chunk.n);
786 assert_eq!(parsed.d, chunk.d);
787 }
788 _ => panic!("Expected chunk"),
789 }
790 }
791
792 #[test]
793 fn test_encode_decode_peer_hints() {
794 let hints = PeerHints {
795 signal_urls: vec!["http://127.0.0.1:18080".to_string()],
796 };
797
798 match parse_message(&encode_peer_hints(&hints)).unwrap() {
799 DataMessage::PeerHints(parsed) => {
800 assert_eq!(parsed.signal_urls, hints.signal_urls);
801 }
802 _ => panic!("Expected peer hints"),
803 }
804 }
805
806 #[test]
807 fn test_encode_decode_pubsub_messages() {
808 let interest = create_pubsub_interest("author:alice", "subscriber-a", 42, true, 5);
809 match parse_message(&encode_pubsub_interest(&interest)).unwrap() {
810 DataMessage::PubsubInterest(parsed) => {
811 assert_eq!(parsed.stream_id, "author:alice");
812 assert_eq!(parsed.subscriber_peer_id, "subscriber-a");
813 assert_eq!(parsed.seq, 42);
814 assert!(parsed.active);
815 assert_eq!(parsed.htl, 5);
816 }
817 _ => panic!("Expected pubsub interest"),
818 }
819
820 let frame = create_pubsub_frame("author:alice", 7, "publisher-a", vec![1, 2, 3], 4);
821 match parse_message(&encode_pubsub_frame(&frame)).unwrap() {
822 DataMessage::PubsubFrame(parsed) => {
823 assert_eq!(parsed.stream_id, "author:alice");
824 assert_eq!(parsed.seq, 7);
825 assert_eq!(parsed.origin_peer_id, "publisher-a");
826 assert_eq!(parsed.htl, 4);
827 assert_eq!(parsed.payload, vec![1, 2, 3]);
828 }
829 _ => panic!("Expected pubsub frame"),
830 }
831
832 let inv = create_pubsub_inventory("author:alice", 7, "publisher-a", 3, 4);
833 match parse_message(&encode_pubsub_inventory(&inv)).unwrap() {
834 DataMessage::PubsubInventory(parsed) => {
835 assert_eq!(parsed.stream_id, "author:alice");
836 assert_eq!(parsed.seq, 7);
837 assert_eq!(parsed.origin_peer_id, "publisher-a");
838 assert_eq!(parsed.payload_bytes, 3);
839 assert_eq!(parsed.htl, 4);
840 }
841 _ => panic!("Expected pubsub inventory"),
842 }
843
844 let want = create_pubsub_want("author:alice", 7, "publisher-a");
845 match parse_message(&encode_pubsub_want(&want)).unwrap() {
846 DataMessage::PubsubWant(parsed) => {
847 assert_eq!(parsed.stream_id, "author:alice");
848 assert_eq!(parsed.seq, 7);
849 assert_eq!(parsed.origin_peer_id, "publisher-a");
850 }
851 _ => panic!("Expected pubsub want"),
852 }
853 }
854}