Skip to main content

ant_protocol/
chunk.rs

1//! Chunk message types for the ANT protocol.
2//!
3//! Chunks are immutable, content-addressed data blocks where the address
4//! is the BLAKE3 hash of the content. Maximum size is 4MB.
5//!
6//! This module defines the wire protocol messages for chunk operations
7//! using postcard serialization for compact, fast encoding.
8
9use bytes::Bytes;
10use serde::{Deserialize, Serialize};
11
12/// Protocol identifier for chunk operations.
13pub const CHUNK_PROTOCOL_ID: &str = "autonomi.ant.chunk.v1";
14
15/// Current protocol version.
16pub const PROTOCOL_VERSION: u16 = 1;
17
18/// Maximum chunk size in bytes (4MB).
19pub const MAX_CHUNK_SIZE: usize = 4 * 1024 * 1024;
20
21/// Maximum wire message size in bytes (5MB).
22///
23/// Limits the input buffer accepted by [`ChunkMessage::decode`] to prevent
24/// unbounded allocation from malicious or corrupted payloads. Set slightly
25/// above [`MAX_CHUNK_SIZE`] to accommodate message envelope overhead.
26pub const MAX_WIRE_MESSAGE_SIZE: usize = 5 * 1024 * 1024;
27
28/// Data type identifier for chunks.
29pub const DATA_TYPE_CHUNK: u32 = 0;
30
31/// Number of nodes in a Kademlia close group.
32///
33/// Clients fetch quotes from the `CLOSE_GROUP_SIZE` closest nodes to a target
34/// address and select the median-priced quote for payment.
35pub const CLOSE_GROUP_SIZE: usize = 7;
36
37/// Minimum number of close group members that must agree for a decision to be valid.
38///
39/// This is a simple majority: `(CLOSE_GROUP_SIZE / 2) + 1`.
40pub const CLOSE_GROUP_MAJORITY: usize = (CLOSE_GROUP_SIZE / 2) + 1;
41
42/// Content-addressed identifier (32 bytes).
43pub type XorName = [u8; 32];
44
45/// Byte length of an [`XorName`].
46pub const XORNAME_LEN: usize = std::mem::size_of::<XorName>();
47
48/// Enum of all chunk protocol message types.
49///
50/// Uses a single-byte discriminant for efficient wire encoding.
51///
52/// Marked `#[non_exhaustive]` so new message variants can be added
53/// in a minor release without breaking downstream `match` expressions.
54#[derive(Debug, Clone, Serialize, Deserialize)]
55#[non_exhaustive]
56pub enum ChunkMessageBody {
57    /// Request to store a chunk.
58    PutRequest(ChunkPutRequest),
59    /// Response to a PUT request.
60    PutResponse(ChunkPutResponse),
61    /// Request to retrieve a chunk.
62    GetRequest(ChunkGetRequest),
63    /// Response to a GET request.
64    GetResponse(ChunkGetResponse),
65    /// Request a storage quote.
66    QuoteRequest(ChunkQuoteRequest),
67    /// Response with a storage quote.
68    QuoteResponse(ChunkQuoteResponse),
69    /// Request a merkle candidate quote for batch payments.
70    MerkleCandidateQuoteRequest(MerkleCandidateQuoteRequest),
71    /// Response with a merkle candidate quote.
72    MerkleCandidateQuoteResponse(MerkleCandidateQuoteResponse),
73}
74
75/// Wire-format wrapper that pairs a sender-assigned `request_id` with
76/// a [`ChunkMessageBody`].
77///
78/// The sender picks a unique `request_id`; the handler echoes it back
79/// in the response so callers can correlate replies by ID rather than
80/// by source peer.
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct ChunkMessage {
83    /// Sender-assigned identifier, echoed back in the response.
84    pub request_id: u64,
85    /// The protocol message body.
86    pub body: ChunkMessageBody,
87}
88
89impl ChunkMessage {
90    /// Encode the message to bytes using postcard.
91    ///
92    /// # Errors
93    ///
94    /// Returns an error if serialization fails.
95    pub fn encode(&self) -> Result<Vec<u8>, ProtocolError> {
96        postcard::to_stdvec(self).map_err(|e| ProtocolError::SerializationFailed(e.to_string()))
97    }
98
99    /// Decode a message from bytes using postcard.
100    ///
101    /// Rejects payloads larger than [`MAX_WIRE_MESSAGE_SIZE`] before
102    /// attempting deserialization.
103    ///
104    /// # Errors
105    ///
106    /// Returns [`ProtocolError::MessageTooLarge`] if the input exceeds the
107    /// size limit, or [`ProtocolError::DeserializationFailed`] if postcard
108    /// cannot parse the data.
109    pub fn decode(data: &[u8]) -> Result<Self, ProtocolError> {
110        if data.len() > MAX_WIRE_MESSAGE_SIZE {
111            return Err(ProtocolError::MessageTooLarge {
112                size: data.len(),
113                max_size: MAX_WIRE_MESSAGE_SIZE,
114            });
115        }
116        postcard::from_bytes(data).map_err(|e| ProtocolError::DeserializationFailed(e.to_string()))
117    }
118}
119
120// =============================================================================
121// PUT Request/Response
122// =============================================================================
123
124/// Request to store a chunk.
125///
126/// `content` is held as `bytes::Bytes` so that callers fanning the same
127/// chunk out to multiple recipients (e.g. close-group replication) share a
128/// single backing buffer via refcount instead of deep-copying the 4 MB
129/// payload per peer. Wire format is unchanged: `Bytes` serializes as a
130/// byte sequence, identical to `Vec<u8>` under postcard/serde.
131#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct ChunkPutRequest {
133    /// The content-addressed identifier (BLAKE3 of content).
134    pub address: XorName,
135    /// The chunk data.
136    pub content: Bytes,
137    /// Optional payment proof (serialized `ProofOfPayment`).
138    /// Required for new chunks unless already verified.
139    pub payment_proof: Option<Vec<u8>>,
140}
141
142impl ChunkPutRequest {
143    /// Create a new PUT request.
144    #[must_use]
145    pub fn new(address: XorName, content: Bytes) -> Self {
146        Self {
147            address,
148            content,
149            payment_proof: None,
150        }
151    }
152
153    /// Create a new PUT request with payment proof.
154    #[must_use]
155    pub fn with_payment(address: XorName, content: Bytes, payment_proof: Vec<u8>) -> Self {
156        Self {
157            address,
158            content,
159            payment_proof: Some(payment_proof),
160        }
161    }
162}
163
164/// Response to a PUT request.
165#[derive(Debug, Clone, Serialize, Deserialize)]
166#[non_exhaustive]
167pub enum ChunkPutResponse {
168    /// Chunk stored successfully.
169    Success {
170        /// The address where the chunk was stored.
171        address: XorName,
172    },
173    /// Chunk already exists (idempotent success).
174    AlreadyExists {
175        /// The existing chunk address.
176        address: XorName,
177    },
178    /// Payment is required to store this chunk.
179    PaymentRequired {
180        /// Error message.
181        message: String,
182    },
183    /// An error occurred.
184    Error(ProtocolError),
185}
186
187// =============================================================================
188// GET Request/Response
189// =============================================================================
190
191/// Request to retrieve a chunk.
192#[derive(Debug, Clone, Serialize, Deserialize)]
193pub struct ChunkGetRequest {
194    /// The content-addressed identifier to retrieve.
195    pub address: XorName,
196}
197
198impl ChunkGetRequest {
199    /// Create a new GET request.
200    #[must_use]
201    pub fn new(address: XorName) -> Self {
202        Self { address }
203    }
204}
205
206/// Response to a GET request.
207#[derive(Debug, Clone, Serialize, Deserialize)]
208#[non_exhaustive]
209pub enum ChunkGetResponse {
210    /// Chunk found and returned.
211    Success {
212        /// The chunk address.
213        address: XorName,
214        /// The chunk data.
215        content: Vec<u8>,
216    },
217    /// Chunk not found.
218    NotFound {
219        /// The requested address.
220        address: XorName,
221    },
222    /// An error occurred.
223    Error(ProtocolError),
224}
225
226// =============================================================================
227// Quote Request/Response
228// =============================================================================
229
230/// Request a storage quote for a chunk.
231#[derive(Debug, Clone, Serialize, Deserialize)]
232pub struct ChunkQuoteRequest {
233    /// The content address of the data to store.
234    pub address: XorName,
235    /// Size of the data in bytes.
236    pub data_size: u64,
237    /// Data type identifier (0 for chunks).
238    pub data_type: u32,
239}
240
241impl ChunkQuoteRequest {
242    /// Create a new quote request.
243    #[must_use]
244    pub fn new(address: XorName, data_size: u64) -> Self {
245        Self {
246            address,
247            data_size,
248            data_type: DATA_TYPE_CHUNK,
249        }
250    }
251}
252
253/// Response with a storage quote.
254#[derive(Debug, Clone, Serialize, Deserialize)]
255#[non_exhaustive]
256pub enum ChunkQuoteResponse {
257    /// Quote generated successfully.
258    ///
259    /// When `already_stored` is `true` the node already holds this chunk and no
260    /// payment is required — the client should skip the pay-then-PUT cycle for
261    /// this address. The quote is still included for informational purposes.
262    Success {
263        /// Serialized `PaymentQuote`.
264        quote: Vec<u8>,
265        /// `true` when the chunk already exists on this node (skip payment).
266        already_stored: bool,
267    },
268    /// Quote generation failed.
269    Error(ProtocolError),
270}
271
272// =============================================================================
273// Merkle Candidate Quote Request/Response
274// =============================================================================
275
276/// Request a merkle candidate quote for batch payments.
277///
278/// Part of the merkle batch payment system where clients collect
279/// signed candidate quotes from 16 closest peers per pool.
280#[derive(Debug, Clone, Serialize, Deserialize)]
281pub struct MerkleCandidateQuoteRequest {
282    /// The candidate pool address (hash of midpoint || root || timestamp).
283    pub address: XorName,
284    /// Data type identifier (0 for chunks).
285    pub data_type: u32,
286    /// Size of the data in bytes.
287    pub data_size: u64,
288    /// Client-provided merkle payment timestamp (unix seconds).
289    pub merkle_payment_timestamp: u64,
290}
291
292/// Response with a merkle candidate quote.
293#[derive(Debug, Clone, Serialize, Deserialize)]
294#[non_exhaustive]
295pub enum MerkleCandidateQuoteResponse {
296    /// Candidate quote generated successfully.
297    /// Contains the serialized `MerklePaymentCandidateNode`.
298    Success {
299        /// Serialized `MerklePaymentCandidateNode`.
300        candidate_node: Vec<u8>,
301    },
302    /// Quote generation failed.
303    Error(ProtocolError),
304}
305
306// =============================================================================
307// Payment Proof Type Tags
308// =============================================================================
309
310/// Version byte prefix for payment proof serialization.
311/// Allows the verifier to detect proof type before deserialization.
312pub const PROOF_TAG_SINGLE_NODE: u8 = 0x01;
313/// Version byte prefix for merkle payment proofs.
314pub const PROOF_TAG_MERKLE: u8 = 0x02;
315
316// =============================================================================
317// Protocol Errors
318// =============================================================================
319
320/// Errors that can occur during protocol operations.
321#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
322#[non_exhaustive]
323pub enum ProtocolError {
324    /// Message serialization failed.
325    SerializationFailed(String),
326    /// Message deserialization failed.
327    DeserializationFailed(String),
328    /// Wire message exceeds the maximum allowed size.
329    MessageTooLarge {
330        /// Actual size of the message in bytes.
331        size: usize,
332        /// Maximum allowed size.
333        max_size: usize,
334    },
335    /// Chunk exceeds maximum size.
336    ChunkTooLarge {
337        /// Size of the chunk in bytes.
338        size: usize,
339        /// Maximum allowed size.
340        max_size: usize,
341    },
342    /// Content address mismatch (hash(content) != address).
343    AddressMismatch {
344        /// Expected address.
345        expected: XorName,
346        /// Actual address computed from content.
347        actual: XorName,
348    },
349    /// Storage operation failed.
350    StorageFailed(String),
351    /// Payment verification failed.
352    PaymentFailed(String),
353    /// Quote generation failed.
354    QuoteFailed(String),
355    /// Internal error.
356    Internal(String),
357}
358
359impl std::fmt::Display for ProtocolError {
360    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
361        match self {
362            Self::SerializationFailed(msg) => write!(f, "serialization failed: {msg}"),
363            Self::DeserializationFailed(msg) => write!(f, "deserialization failed: {msg}"),
364            Self::MessageTooLarge { size, max_size } => {
365                write!(f, "message size {size} exceeds maximum {max_size}")
366            }
367            Self::ChunkTooLarge { size, max_size } => {
368                write!(f, "chunk size {size} exceeds maximum {max_size}")
369            }
370            Self::AddressMismatch { expected, actual } => {
371                write!(
372                    f,
373                    "address mismatch: expected {}, got {}",
374                    hex::encode(expected),
375                    hex::encode(actual)
376                )
377            }
378            Self::StorageFailed(msg) => write!(f, "storage failed: {msg}"),
379            Self::PaymentFailed(msg) => write!(f, "payment failed: {msg}"),
380            Self::QuoteFailed(msg) => write!(f, "quote failed: {msg}"),
381            Self::Internal(msg) => write!(f, "internal error: {msg}"),
382        }
383    }
384}
385
386impl std::error::Error for ProtocolError {}
387
388#[cfg(test)]
389#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
390mod tests {
391    use super::*;
392
393    #[test]
394    fn test_put_request_encode_decode() {
395        let address = [0xAB; 32];
396        let content = Bytes::from_static(&[1, 2, 3, 4, 5]);
397        let request = ChunkPutRequest::new(address, content.clone());
398        let msg = ChunkMessage {
399            request_id: 42,
400            body: ChunkMessageBody::PutRequest(request),
401        };
402
403        let encoded = msg.encode().expect("encode should succeed");
404        let decoded = ChunkMessage::decode(&encoded).expect("decode should succeed");
405
406        assert_eq!(decoded.request_id, 42);
407        if let ChunkMessageBody::PutRequest(req) = decoded.body {
408            assert_eq!(req.address, address);
409            assert_eq!(req.content, content);
410            assert!(req.payment_proof.is_none());
411        } else {
412            panic!("expected PutRequest");
413        }
414    }
415
416    #[test]
417    fn test_put_request_with_payment() {
418        let address = [0xAB; 32];
419        let content = Bytes::from_static(&[1, 2, 3, 4, 5]);
420        let payment = vec![10, 20, 30];
421        let request = ChunkPutRequest::with_payment(address, content.clone(), payment.clone());
422
423        assert_eq!(request.address, address);
424        assert_eq!(request.content, content);
425        assert_eq!(request.payment_proof, Some(payment));
426    }
427
428    #[test]
429    fn test_get_request_encode_decode() {
430        let address = [0xCD; 32];
431        let request = ChunkGetRequest::new(address);
432        let msg = ChunkMessage {
433            request_id: 7,
434            body: ChunkMessageBody::GetRequest(request),
435        };
436
437        let encoded = msg.encode().expect("encode should succeed");
438        let decoded = ChunkMessage::decode(&encoded).expect("decode should succeed");
439
440        assert_eq!(decoded.request_id, 7);
441        if let ChunkMessageBody::GetRequest(req) = decoded.body {
442            assert_eq!(req.address, address);
443        } else {
444            panic!("expected GetRequest");
445        }
446    }
447
448    #[test]
449    fn test_put_response_success() {
450        let address = [0xEF; 32];
451        let response = ChunkPutResponse::Success { address };
452        let msg = ChunkMessage {
453            request_id: 99,
454            body: ChunkMessageBody::PutResponse(response),
455        };
456
457        let encoded = msg.encode().expect("encode should succeed");
458        let decoded = ChunkMessage::decode(&encoded).expect("decode should succeed");
459
460        assert_eq!(decoded.request_id, 99);
461        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) =
462            decoded.body
463        {
464            assert_eq!(addr, address);
465        } else {
466            panic!("expected PutResponse::Success");
467        }
468    }
469
470    #[test]
471    fn test_get_response_not_found() {
472        let address = [0x12; 32];
473        let response = ChunkGetResponse::NotFound { address };
474        let msg = ChunkMessage {
475            request_id: 0,
476            body: ChunkMessageBody::GetResponse(response),
477        };
478
479        let encoded = msg.encode().expect("encode should succeed");
480        let decoded = ChunkMessage::decode(&encoded).expect("decode should succeed");
481
482        assert_eq!(decoded.request_id, 0);
483        if let ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { address: addr }) =
484            decoded.body
485        {
486            assert_eq!(addr, address);
487        } else {
488            panic!("expected GetResponse::NotFound");
489        }
490    }
491
492    #[test]
493    fn test_quote_request_encode_decode() {
494        let address = [0x34; 32];
495        let request = ChunkQuoteRequest::new(address, 1024);
496        let msg = ChunkMessage {
497            request_id: 1,
498            body: ChunkMessageBody::QuoteRequest(request),
499        };
500
501        let encoded = msg.encode().expect("encode should succeed");
502        let decoded = ChunkMessage::decode(&encoded).expect("decode should succeed");
503
504        assert_eq!(decoded.request_id, 1);
505        if let ChunkMessageBody::QuoteRequest(req) = decoded.body {
506            assert_eq!(req.address, address);
507            assert_eq!(req.data_size, 1024);
508            assert_eq!(req.data_type, DATA_TYPE_CHUNK);
509        } else {
510            panic!("expected QuoteRequest");
511        }
512    }
513
514    #[test]
515    fn test_protocol_error_display() {
516        let err = ProtocolError::ChunkTooLarge {
517            size: 5_000_000,
518            max_size: MAX_CHUNK_SIZE,
519        };
520        assert!(err.to_string().contains("5000000"));
521        assert!(err.to_string().contains(&MAX_CHUNK_SIZE.to_string()));
522
523        let err = ProtocolError::AddressMismatch {
524            expected: [0xAA; 32],
525            actual: [0xBB; 32],
526        };
527        let display = err.to_string();
528        assert!(display.contains("address mismatch"));
529    }
530
531    #[test]
532    fn test_decode_rejects_oversized_payload() {
533        let oversized = vec![0u8; MAX_WIRE_MESSAGE_SIZE + 1];
534        let result = ChunkMessage::decode(&oversized);
535        assert!(result.is_err());
536        let err = result.unwrap_err();
537        assert!(
538            matches!(err, ProtocolError::MessageTooLarge { .. }),
539            "expected MessageTooLarge, got {err:?}"
540        );
541    }
542
543    #[test]
544    fn test_invalid_decode() {
545        let invalid_data = vec![0xFF, 0xFF, 0xFF];
546        let result = ChunkMessage::decode(&invalid_data);
547        assert!(result.is_err());
548    }
549
550    #[test]
551    fn test_constants() {
552        assert_eq!(CHUNK_PROTOCOL_ID, "autonomi.ant.chunk.v1");
553        assert_eq!(PROTOCOL_VERSION, 1);
554        assert_eq!(MAX_CHUNK_SIZE, 4 * 1024 * 1024);
555        assert_eq!(DATA_TYPE_CHUNK, 0);
556    }
557
558    #[test]
559    fn test_proof_tag_constants() {
560        // Tags must be distinct non-zero bytes
561        assert_ne!(PROOF_TAG_SINGLE_NODE, PROOF_TAG_MERKLE);
562        assert_ne!(PROOF_TAG_SINGLE_NODE, 0x00);
563        assert_ne!(PROOF_TAG_MERKLE, 0x00);
564        assert_eq!(PROOF_TAG_SINGLE_NODE, 0x01);
565        assert_eq!(PROOF_TAG_MERKLE, 0x02);
566    }
567
568    #[test]
569    fn test_merkle_candidate_quote_request_encode_decode() {
570        let address = [0x56; 32];
571        let request = MerkleCandidateQuoteRequest {
572            address,
573            data_type: DATA_TYPE_CHUNK,
574            data_size: 2048,
575            merkle_payment_timestamp: 1_700_000_000,
576        };
577        let msg = ChunkMessage {
578            request_id: 500,
579            body: ChunkMessageBody::MerkleCandidateQuoteRequest(request),
580        };
581
582        let encoded = msg.encode().expect("encode should succeed");
583        let decoded = ChunkMessage::decode(&encoded).expect("decode should succeed");
584
585        assert_eq!(decoded.request_id, 500);
586        if let ChunkMessageBody::MerkleCandidateQuoteRequest(req) = decoded.body {
587            assert_eq!(req.address, address);
588            assert_eq!(req.data_type, DATA_TYPE_CHUNK);
589            assert_eq!(req.data_size, 2048);
590            assert_eq!(req.merkle_payment_timestamp, 1_700_000_000);
591        } else {
592            panic!("expected MerkleCandidateQuoteRequest");
593        }
594    }
595
596    #[test]
597    fn test_merkle_candidate_quote_response_success_encode_decode() {
598        let candidate_node_bytes = vec![0xAA, 0xBB, 0xCC, 0xDD];
599        let response = MerkleCandidateQuoteResponse::Success {
600            candidate_node: candidate_node_bytes.clone(),
601        };
602        let msg = ChunkMessage {
603            request_id: 501,
604            body: ChunkMessageBody::MerkleCandidateQuoteResponse(response),
605        };
606
607        let encoded = msg.encode().expect("encode should succeed");
608        let decoded = ChunkMessage::decode(&encoded).expect("decode should succeed");
609
610        assert_eq!(decoded.request_id, 501);
611        if let ChunkMessageBody::MerkleCandidateQuoteResponse(
612            MerkleCandidateQuoteResponse::Success { candidate_node },
613        ) = decoded.body
614        {
615            assert_eq!(candidate_node, candidate_node_bytes);
616        } else {
617            panic!("expected MerkleCandidateQuoteResponse::Success");
618        }
619    }
620
621    #[test]
622    fn test_merkle_candidate_quote_response_error_encode_decode() {
623        let error = ProtocolError::QuoteFailed("no libp2p keypair".to_string());
624        let response = MerkleCandidateQuoteResponse::Error(error.clone());
625        let msg = ChunkMessage {
626            request_id: 502,
627            body: ChunkMessageBody::MerkleCandidateQuoteResponse(response),
628        };
629
630        let encoded = msg.encode().expect("encode should succeed");
631        let decoded = ChunkMessage::decode(&encoded).expect("decode should succeed");
632
633        assert_eq!(decoded.request_id, 502);
634        if let ChunkMessageBody::MerkleCandidateQuoteResponse(
635            MerkleCandidateQuoteResponse::Error(err),
636        ) = decoded.body
637        {
638            assert_eq!(err, error);
639        } else {
640            panic!("expected MerkleCandidateQuoteResponse::Error");
641        }
642    }
643}