Skip to main content

buffer/
queue.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicU64, Ordering};
3
4use bytes::{BufMut, Bytes, BytesMut};
5use slatedb::object_store::path::Path;
6use slatedb::object_store::{
7    Error as ObjectStoreError, ObjectStore, PutMode, PutPayload, UpdateVersion,
8};
9
10use crate::error::{Error, Result};
11
12const MANIFEST_VERSION: u16 = 1;
13const UNINITIALIZED_EPOCH: u64 = u64::MAX;
14const ENTRY_LEN_SIZE: usize = 4;
15const LOCATION_LEN_SIZE: usize = 2;
16const INGESTION_TIME_MS_SIZE: usize = 8;
17const METADATA_LEN_SIZE: usize = 4;
18const START_INDEX_SIZE: usize = 4;
19const METADATA_COUNT_SIZE: usize = 4;
20const ENTRIES_COUNT_SIZE: usize = 4;
21const SEQUENCE_SIZE: usize = 8;
22const EPOCH_SIZE: usize = 8;
23const VERSION_SIZE: usize = 2;
24const FOOTER_SIZE: usize = ENTRIES_COUNT_SIZE + SEQUENCE_SIZE + EPOCH_SIZE + VERSION_SIZE;
25
26/// Per-range metadata attached to a batch by the buffer.
27#[derive(Debug, Clone, PartialEq)]
28pub struct Metadata {
29    /// Index of the first entry in the batch that this metadata range covers.
30    pub start_index: u32,
31    /// Wall-clock ingestion time in milliseconds since the Unix epoch.
32    pub ingestion_time_ms: i64,
33    /// Opaque metadata payload supplied by the caller of [`Producer::produce`](crate::Producer::produce).
34    pub payload: Bytes,
35}
36
37#[derive(Debug, Clone)]
38pub(crate) struct QueueEntry {
39    pub(crate) sequence: u64,
40    pub(crate) location: String,
41    pub(crate) metadata: Vec<Metadata>,
42}
43
44impl QueueEntry {
45    fn new(location: String, metadata: Vec<Metadata>) -> Result<Self> {
46        if location.len() > u16::MAX as usize {
47            return Err(Error::InvalidInput(format!(
48                "location length {} exceeds u16::MAX",
49                location.len()
50            )));
51        }
52        if metadata.len() > u32::MAX as usize {
53            return Err(Error::InvalidInput(format!(
54                "metadata count {} exceeds u32::MAX",
55                metadata.len()
56            )));
57        }
58        Ok(Self {
59            sequence: 0,
60            location,
61            metadata,
62        })
63    }
64
65    fn clone_with_sequence(&self, sequence: u64) -> Self {
66        Self {
67            sequence,
68            ..self.clone()
69        }
70    }
71}
72
73#[derive(Debug, Clone)]
74pub(crate) struct Manifest {
75    data: Bytes,
76    appended: BytesMut,
77    appended_count: usize,
78    next_sequence: u64,
79    epoch: u64,
80}
81
82impl Manifest {
83    /// Create an empty manifest with a valid footer.
84    fn empty() -> Self {
85        let mut buf = BytesMut::with_capacity(FOOTER_SIZE);
86        buf.put_u32_le(0);
87        buf.put_u64_le(0);
88        buf.put_u64_le(0);
89        buf.put_u16_le(MANIFEST_VERSION);
90        Self {
91            data: buf.freeze(),
92            appended: BytesMut::new(),
93            appended_count: 0,
94            next_sequence: 0,
95            epoch: 0,
96        }
97    }
98
99    /// Wrap raw binary data as a queue manifest, validating the footer.
100    pub(crate) fn from_bytes(data: Bytes) -> Result<Self> {
101        if data.is_empty() {
102            return Err(Error::Serialization(
103                "queue manifest data must not be empty".to_string(),
104            ));
105        }
106        if data.len() < FOOTER_SIZE {
107            return Err(Error::Serialization(
108                "queue manifest too short for footer".to_string(),
109            ));
110        }
111        let version_start = data.len() - VERSION_SIZE;
112        let version = u16::from_le_bytes(data[version_start..].try_into().unwrap());
113        if version != MANIFEST_VERSION {
114            return Err(Error::Serialization(format!(
115                "unsupported queue manifest version: {}",
116                version
117            )));
118        }
119        let epoch_start = data.len() - VERSION_SIZE - EPOCH_SIZE;
120        let epoch = u64::from_le_bytes(
121            data[epoch_start..epoch_start + EPOCH_SIZE]
122                .try_into()
123                .unwrap(),
124        );
125        let next_seq_start = data.len() - VERSION_SIZE - EPOCH_SIZE - SEQUENCE_SIZE;
126        let next_sequence = u64::from_le_bytes(
127            data[next_seq_start..next_seq_start + SEQUENCE_SIZE]
128                .try_into()
129                .unwrap(),
130        );
131        Ok(Self {
132            data,
133            appended: BytesMut::new(),
134            appended_count: 0,
135            next_sequence,
136            epoch,
137        })
138    }
139
140    /// Build a manifest from a slice of entries.
141    #[cfg(test)]
142    fn from_entries(entries: &[QueueEntry]) -> Self {
143        let next_sequence = entries.iter().map(|e| e.sequence + 1).max().unwrap_or(0);
144        let mut buf = BytesMut::new();
145        for entry in entries {
146            Self::encode_entry(&mut buf, entry).unwrap();
147        }
148        buf.put_u32_le(entries.len() as u32);
149        buf.put_u64_le(next_sequence);
150        buf.put_u64_le(0);
151        buf.put_u16_le(MANIFEST_VERSION);
152        Self {
153            data: buf.freeze(),
154            appended: BytesMut::new(),
155            appended_count: 0,
156            next_sequence,
157            epoch: 0,
158        }
159    }
160
161    /// Number of entries (read from the footer, O(1)).
162    fn entries_count(&self) -> usize {
163        let base = self.existing_entries_count();
164        base + self.appended_count
165    }
166
167    /// Whether the manifest contains no entries.
168    #[cfg(test)]
169    fn is_empty(&self) -> bool {
170        self.entries_count() == 0
171    }
172
173    /// Return a borrowing iterator that lazily deserializes entries.
174    pub(crate) fn iter(&self) -> ManifestIter<'_> {
175        let base_count = self.existing_entries_count();
176        let entries_end = if self.data.is_empty() {
177            0
178        } else {
179            self.data.len() - FOOTER_SIZE
180        };
181        ManifestIter {
182            data: &self.data,
183            offset: 0,
184            remaining: base_count,
185            entries_end,
186            appended: &self.appended,
187            appended_offset: 0,
188            appended_remaining: self.appended_count,
189        }
190    }
191
192    fn existing_entries_count(&self) -> usize {
193        if self.data.is_empty() {
194            0
195        } else {
196            let footer_start = self.data.len() - FOOTER_SIZE;
197            u32::from_le_bytes(
198                self.data[footer_start..footer_start + ENTRIES_COUNT_SIZE]
199                    .try_into()
200                    .unwrap(),
201            ) as usize
202        }
203    }
204
205    /// Append a single entry without copying existing data.
206    /// The entry is encoded and stored internally; bytes are merged in `to_bytes()`.
207    /// The entry's sequence number is overwritten with the manifest's next sequence.
208    fn append(&mut self, entry: &QueueEntry) -> Result<()> {
209        let sequenced = entry.clone_with_sequence(self.next_sequence);
210        Self::encode_entry(&mut self.appended, &sequenced)?;
211        self.next_sequence += 1;
212        self.appended_count += 1;
213        Ok(())
214    }
215
216    /// Remove all entries with sequence <= `through_sequence`, returning them.
217    ///
218    /// Optimized to avoid deserializing/re-serializing remaining entries: only the
219    /// removed entries are fully decoded, while remaining entries are byte-copied.
220    fn dequeue(&mut self, through_sequence: u64) -> Result<Vec<QueueEntry>> {
221        let next_seq = self.next_sequence;
222        let epoch = self.epoch;
223
224        let base_count = self.existing_entries_count();
225        let entries_end = if self.data.is_empty() {
226            0
227        } else {
228            self.data.len() - FOOTER_SIZE
229        };
230
231        let (mut removed, remaining_base_start, remaining_base_count) =
232            split_entries(&self.data, base_count, entries_end, through_sequence)?;
233
234        let appended_end = self.appended.len();
235        let (appended_removed, remaining_appended_start, remaining_appended_count) = split_entries(
236            &self.appended,
237            self.appended_count,
238            appended_end,
239            through_sequence,
240        )?;
241        removed.extend(appended_removed);
242
243        let remaining_base_bytes = &self.data[remaining_base_start..entries_end];
244        let remaining_appended_bytes = &self.appended[remaining_appended_start..appended_end];
245        let total_remaining = remaining_base_count + remaining_appended_count;
246
247        let mut buf = BytesMut::with_capacity(
248            remaining_base_bytes.len() + remaining_appended_bytes.len() + FOOTER_SIZE,
249        );
250        buf.extend_from_slice(remaining_base_bytes);
251        buf.extend_from_slice(remaining_appended_bytes);
252        buf.put_u32_le(total_remaining);
253        buf.put_u64_le(next_seq);
254        buf.put_u64_le(epoch);
255        buf.put_u16_le(MANIFEST_VERSION);
256
257        self.data = buf.freeze();
258        self.appended = BytesMut::new();
259        self.appended_count = 0;
260        self.next_sequence = next_seq;
261        self.epoch = epoch;
262
263        Ok(removed)
264    }
265
266    /// Set the epoch and patch the data bytes in place.
267    fn set_epoch(&mut self, epoch: u64) {
268        self.epoch = epoch;
269        let mut buf = BytesMut::from(self.data.as_ref());
270        let epoch_start = buf.len() - VERSION_SIZE - EPOCH_SIZE;
271        buf[epoch_start..epoch_start + EPOCH_SIZE].copy_from_slice(&epoch.to_le_bytes());
272        self.data = buf.freeze();
273    }
274
275    /// Serialize the manifest to bytes for writing to object storage.
276    /// When an entry was appended, this merges existing data with the appended
277    /// entry and writes a new footer.
278    fn to_bytes(&self) -> Result<Bytes> {
279        if self.appended.is_empty() {
280            return Ok(self.data.clone());
281        }
282        let (prefix, base_count) = if self.data.is_empty() {
283            (&[] as &[u8], 0u32)
284        } else {
285            let footer_start = self.data.len() - FOOTER_SIZE;
286            let count = u32::from_le_bytes(
287                self.data[footer_start..footer_start + ENTRIES_COUNT_SIZE]
288                    .try_into()
289                    .unwrap(),
290            );
291            (&self.data[..footer_start], count)
292        };
293        let total_count: u32 = base_count
294            .checked_add(self.appended_count as u32)
295            .ok_or_else(|| {
296                Error::Serialization(format!(
297                    "total entry count consisting of {} existing entries + {} appended entries exceeds u32::MAX",
298                    base_count, self.appended_count
299                ))
300            })?;
301        let mut buf = BytesMut::with_capacity(prefix.len() + self.appended.len() + FOOTER_SIZE);
302        buf.extend_from_slice(prefix);
303        buf.extend_from_slice(&self.appended);
304        buf.put_u32_le(total_count);
305        buf.put_u64_le(self.next_sequence);
306        buf.put_u64_le(self.epoch);
307        buf.put_u16_le(MANIFEST_VERSION);
308        Ok(buf.freeze())
309    }
310
311    fn encode_entry(buf: &mut BytesMut, entry: &QueueEntry) -> Result<()> {
312        debug_assert!(entry.location.len() <= u16::MAX as usize);
313        let metadata_size: usize = METADATA_COUNT_SIZE
314            + entry
315                .metadata
316                .iter()
317                .map(|m| {
318                    START_INDEX_SIZE + INGESTION_TIME_MS_SIZE + METADATA_LEN_SIZE + m.payload.len()
319                })
320                .sum::<usize>();
321        let entry_body_len =
322            SEQUENCE_SIZE + LOCATION_LEN_SIZE + entry.location.len() + metadata_size;
323        debug_assert!(entry_body_len <= u32::MAX as usize);
324        buf.put_u32_le(entry_body_len as u32);
325        buf.put_u64_le(entry.sequence);
326        buf.put_u16_le(entry.location.len() as u16);
327        buf.extend_from_slice(entry.location.as_bytes());
328        debug_assert!(entry.metadata.len() <= u32::MAX as usize);
329        buf.put_u32_le(entry.metadata.len() as u32);
330        for m in &entry.metadata {
331            if m.payload.len() > u32::MAX as usize {
332                return Err(Error::InvalidInput(format!(
333                    "metadata payload size {} exceeds u32::MAX",
334                    m.payload.len()
335                )));
336            }
337            buf.put_u32_le(m.start_index);
338            buf.put_i64_le(m.ingestion_time_ms);
339            buf.put_u32_le(m.payload.len() as u32);
340            buf.extend_from_slice(&m.payload);
341        }
342        Ok(())
343    }
344}
345
346/// Walk entries in `data[0..end]`, splitting at `through_sequence`.
347/// Entries with sequence <= through_sequence are fully decoded and returned.
348/// Returns (removed_entries, remaining_start_offset, remaining_count).
349fn split_entries(
350    data: &[u8],
351    count: usize,
352    end: usize,
353    through_sequence: u64,
354) -> Result<(Vec<QueueEntry>, usize, u32)> {
355    let mut removed = Vec::new();
356    let mut offset = 0usize;
357
358    for i in 0..count {
359        let entry_start = offset;
360        let entry = decode_entry(data, &mut offset, end)?;
361
362        if entry.sequence <= through_sequence {
363            removed.push(entry);
364        } else {
365            return Ok((removed, entry_start, (count - i) as u32));
366        }
367    }
368
369    Ok((removed, end, 0))
370}
371
372/// Decode a single entry from binary data at the given offset.
373fn decode_entry(data: &[u8], offset: &mut usize, end: usize) -> Result<QueueEntry> {
374    if *offset + ENTRY_LEN_SIZE > end {
375        return Err(Error::Serialization(
376            "queue entry corrupt: size of entry length field does not fit in entry".to_string(),
377        ));
378    }
379
380    let entry_len =
381        u32::from_le_bytes(data[*offset..*offset + ENTRY_LEN_SIZE].try_into().unwrap()) as usize;
382    *offset += ENTRY_LEN_SIZE;
383
384    if *offset + entry_len > end {
385        return Err(Error::Serialization(
386            "queue entry corrupt: entry has less bytes than set in the entry length".to_string(),
387        ));
388    }
389
390    let entry_end = *offset + entry_len;
391
392    let sequence = u64::from_le_bytes(data[*offset..*offset + SEQUENCE_SIZE].try_into().unwrap());
393    *offset += SEQUENCE_SIZE;
394
395    let location_len = u16::from_le_bytes(
396        data[*offset..*offset + LOCATION_LEN_SIZE]
397            .try_into()
398            .unwrap(),
399    ) as usize;
400    *offset += LOCATION_LEN_SIZE;
401
402    let min_entry_len = SEQUENCE_SIZE + LOCATION_LEN_SIZE + location_len + METADATA_COUNT_SIZE;
403    if entry_len < min_entry_len {
404        return Err(Error::Serialization(format!(
405            "queue entry corrupt: entry length {} is less than minimum entry length {} for the length of the location {}",
406            entry_len, min_entry_len, location_len
407        )));
408    }
409
410    let location = String::from_utf8(data[*offset..*offset + location_len].to_vec())
411        .map_err(|e| Error::Serialization(e.to_string()))?;
412    *offset += location_len;
413
414    let metadata_count = u32::from_le_bytes(
415        data[*offset..*offset + METADATA_COUNT_SIZE]
416            .try_into()
417            .unwrap(),
418    ) as usize;
419    *offset += METADATA_COUNT_SIZE;
420
421    let mut metadata = Vec::with_capacity(metadata_count);
422    for _ in 0..metadata_count {
423        if *offset + START_INDEX_SIZE > end {
424            return Err(Error::Serialization(
425                "queue entry corrupt: size of start index field does not fit in entry".to_string(),
426            ));
427        }
428        let start_index = u32::from_le_bytes(
429            data[*offset..*offset + START_INDEX_SIZE]
430                .try_into()
431                .unwrap(),
432        );
433        *offset += START_INDEX_SIZE;
434
435        if *offset + INGESTION_TIME_MS_SIZE > end {
436            return Err(Error::Serialization(
437                "queue entry corrupt: size of ingestion time field does not fit in entry"
438                    .to_string(),
439            ));
440        }
441        let ingestion_time_ms = i64::from_le_bytes(
442            data[*offset..*offset + INGESTION_TIME_MS_SIZE]
443                .try_into()
444                .unwrap(),
445        );
446        *offset += INGESTION_TIME_MS_SIZE;
447
448        if *offset + METADATA_LEN_SIZE > end {
449            return Err(Error::Serialization(
450                "queue entry corrupt: size of metadata length field does not fit in entry"
451                    .to_string(),
452            ));
453        }
454        let m_len = u32::from_le_bytes(
455            data[*offset..*offset + METADATA_LEN_SIZE]
456                .try_into()
457                .unwrap(),
458        ) as usize;
459        *offset += METADATA_LEN_SIZE;
460
461        if *offset + m_len > end {
462            return Err(Error::Serialization(
463                "queue entry corrupt: metadata has less bytes than set in the metadata length"
464                    .to_string(),
465            ));
466        }
467        metadata.push(Metadata {
468            start_index,
469            ingestion_time_ms,
470            payload: Bytes::copy_from_slice(&data[*offset..*offset + m_len]),
471        });
472        *offset += m_len;
473    }
474
475    *offset = entry_end;
476
477    Ok(QueueEntry {
478        sequence,
479        location,
480        metadata,
481    })
482}
483
484/// Borrowing iterator over manifest entries. Lazily deserializes each entry.
485pub(crate) struct ManifestIter<'a> {
486    data: &'a [u8],
487    offset: usize,
488    remaining: usize,
489    entries_end: usize,
490    appended: &'a [u8],
491    appended_offset: usize,
492    appended_remaining: usize,
493}
494
495impl Iterator for ManifestIter<'_> {
496    type Item = Result<QueueEntry>;
497
498    fn next(&mut self) -> Option<Self::Item> {
499        if self.remaining > 0 {
500            self.remaining -= 1;
501            Some(decode_entry(self.data, &mut self.offset, self.entries_end))
502        } else if self.appended_remaining > 0 {
503            self.appended_remaining -= 1;
504            Some(decode_entry(
505                self.appended,
506                &mut self.appended_offset,
507                self.appended.len(),
508            ))
509        } else if self.offset != self.entries_end {
510            let err = Some(Err(Error::Serialization(format!(
511                "base entries did not consume all bytes: offset {} != entries_end {}",
512                self.offset, self.entries_end
513            ))));
514            self.offset = self.entries_end;
515            err
516        } else {
517            None
518        }
519    }
520}
521
522enum ManifestWriteError {
523    Conflict,
524    Fatal(Error),
525}
526
527#[derive(Clone)]
528pub(crate) struct ManifestStore {
529    pub(crate) object_store: Arc<dyn ObjectStore>,
530    pub(crate) manifest_path: String,
531}
532
533impl ManifestStore {
534    pub(crate) async fn read(&self) -> Result<(Manifest, Option<UpdateVersion>)> {
535        let path = Path::from(self.manifest_path.as_str());
536        match self.object_store.get(&path).await {
537            Ok(result) => {
538                let version = UpdateVersion {
539                    e_tag: result.meta.e_tag.clone(),
540                    version: result.meta.version.clone(),
541                };
542                let bytes = result
543                    .bytes()
544                    .await
545                    .map_err(|e| Error::Storage(e.to_string()))?;
546                let manifest = Manifest::from_bytes(bytes)?;
547                Ok((manifest, Some(version)))
548            }
549            Err(ObjectStoreError::NotFound { .. }) => Ok((Manifest::empty(), None)),
550            Err(e) => Err(Error::Storage(e.to_string())),
551        }
552    }
553
554    async fn write(
555        &self,
556        manifest: &Manifest,
557        version: Option<UpdateVersion>,
558    ) -> std::result::Result<(), ManifestWriteError> {
559        let path = Path::from(self.manifest_path.as_str());
560        let put_mode = match version {
561            Some(v) => PutMode::Update(v),
562            None => PutMode::Create,
563        };
564        let data = manifest.to_bytes().map_err(ManifestWriteError::Fatal)?;
565
566        match self
567            .object_store
568            .put_opts(&path, PutPayload::from(data.to_vec()), put_mode.into())
569            .await
570        {
571            Ok(_) => Ok(()),
572            Err(ObjectStoreError::Precondition { .. })
573            | Err(ObjectStoreError::AlreadyExists { .. }) => Err(ManifestWriteError::Conflict),
574            Err(e) => Err(ManifestWriteError::Fatal(Error::Storage(e.to_string()))),
575        }
576    }
577}
578
579struct ConflictCounter {
580    write_count: AtomicU64,
581    conflict_count: AtomicU64,
582    role: &'static str,
583}
584
585impl ConflictCounter {
586    fn new(role: &'static str) -> Self {
587        Self {
588            write_count: AtomicU64::new(0),
589            conflict_count: AtomicU64::new(0),
590            role,
591        }
592    }
593
594    fn record_write(&self) {
595        self.write_count.fetch_add(1, Ordering::Relaxed);
596        metrics::counter!(crate::metric_names::MANIFEST_WRITES, "role" => self.role).increment(1);
597    }
598
599    fn record_conflict(&self) {
600        self.conflict_count.fetch_add(1, Ordering::Relaxed);
601        metrics::counter!(crate::metric_names::MANIFEST_CONFLICTS, "role" => self.role)
602            .increment(1);
603    }
604
605    fn conflict_rate(&self) -> f64 {
606        let writes = self.write_count.load(Ordering::Relaxed);
607        if writes == 0 {
608            return 0.0;
609        }
610        let conflicts = self.conflict_count.load(Ordering::Relaxed);
611        let rate = (conflicts as f64 / writes as f64) * 100.0;
612        rate.min(100.0)
613    }
614}
615
616/// A producer that appends entries to a shared manifest in object storage.
617///
618/// Writes use optimistic concurrency: the manifest is read, modified locally,
619/// and written back with a conditional put. On conflict the operation is
620/// retried automatically until it succeeds.
621pub struct QueueProducer {
622    manifest_store: ManifestStore,
623    counter: ConflictCounter,
624}
625
626impl QueueProducer {
627    /// Create a new producer backed by the given [`ObjectStore`].
628    pub fn with_object_store(manifest_path: String, object_store: Arc<dyn ObjectStore>) -> Self {
629        Self {
630            manifest_store: ManifestStore {
631                object_store,
632                manifest_path,
633            },
634            counter: ConflictCounter::new("producer"),
635        }
636    }
637
638    /// Append an entry to the queue with the given `location` and `metadata`.
639    ///
640    /// Returns [`Error::InvalidInput`] if `location` exceeds 65 535 bytes or
641    /// `metadata` exceeds 2³²−1 items. The write is retried automatically on
642    /// optimistic-concurrency conflicts.
643    pub async fn enqueue(&self, location: String, metadata: Vec<Metadata>) -> Result<()> {
644        let entry = QueueEntry::new(location, metadata)?;
645        loop {
646            let (mut manifest, version) = self.manifest_store.read().await?;
647            manifest.append(&entry)?;
648            self.counter.record_write();
649            match self.manifest_store.write(&manifest, version).await {
650                Ok(()) => return Ok(()),
651                Err(ManifestWriteError::Conflict) => {
652                    self.counter.record_conflict();
653                    continue;
654                }
655                Err(ManifestWriteError::Fatal(e)) => return Err(e),
656            }
657        }
658    }
659
660    /// Return the percentage of manifest writes that encountered a conflict.
661    pub fn conflict_rate(&self) -> f64 {
662        self.counter.conflict_rate()
663    }
664}
665
666/// A consumer that reads and dequeues entries from a shared manifest in
667/// object storage.
668///
669/// Single-consumer semantics are enforced through an epoch stored in the
670/// manifest. Calling [`QueueConsumer::initialize`] increments the epoch,
671/// fencing any previous consumer instance. Every subsequent read or dequeue
672/// checks that the local epoch still matches the manifest, returning
673/// [`Error::Fenced`] if another consumer has taken over.
674pub struct QueueConsumer {
675    manifest_store: ManifestStore,
676    epoch: AtomicU64,
677    counter: ConflictCounter,
678    queue_len: AtomicU64,
679}
680
681impl QueueConsumer {
682    /// Create a new consumer backed by the given [`ObjectStore`].
683    ///
684    /// The consumer is not active until [`QueueConsumer::initialize`] is called.
685    pub fn with_object_store(manifest_path: String, object_store: Arc<dyn ObjectStore>) -> Self {
686        Self {
687            manifest_store: ManifestStore {
688                object_store,
689                manifest_path,
690            },
691            epoch: AtomicU64::new(UNINITIALIZED_EPOCH),
692            counter: ConflictCounter::new("consumer"),
693            queue_len: AtomicU64::new(0),
694        }
695    }
696
697    /// Initialize the consumer by incrementing the epoch in the queue manifest.
698    /// This fences any previous consumer that was using the old epoch.
699    pub async fn initialize(&self) -> Result<()> {
700        loop {
701            let (mut manifest, version) = self.read_manifest().await?;
702            let mut new_epoch = manifest.epoch.wrapping_add(1);
703            if new_epoch == UNINITIALIZED_EPOCH {
704                new_epoch = new_epoch.wrapping_add(1);
705            }
706            manifest.set_epoch(new_epoch);
707            match self.write_manifest(&manifest, version).await {
708                Ok(()) => {
709                    self.epoch.store(new_epoch, Ordering::Relaxed);
710                    return Ok(());
711                }
712                Err(ManifestWriteError::Conflict) => {
713                    self.counter.record_conflict();
714                    continue;
715                }
716                Err(ManifestWriteError::Fatal(e)) => return Err(e),
717            }
718        }
719    }
720
721    /// Return the first entry in the queue without dequeueing it.
722    /// Returns `Fenced` if the consumer's epoch does not match the manifest's epoch.
723    pub(crate) async fn peek(&self) -> Result<Option<QueueEntry>> {
724        let (manifest, _) = self.read_manifest().await?;
725        if manifest.epoch != self.epoch.load(Ordering::Relaxed) {
726            return Err(Error::Fenced);
727        }
728        manifest.iter().next().transpose()
729    }
730
731    /// Return the entry with the given sequence number, or None if not found.
732    /// Returns `Fenced` if the consumer's epoch does not match the manifest's epoch.
733    pub(crate) async fn read(&self, sequence: u64) -> Result<Option<QueueEntry>> {
734        let (manifest, _) = self.read_manifest().await?;
735        if manifest.epoch != self.epoch.load(Ordering::Relaxed) {
736            return Err(Error::Fenced);
737        }
738        manifest
739            .iter()
740            .find(|e| matches!(e, Ok(e) if e.sequence == sequence))
741            .transpose()
742    }
743
744    /// Remove all entries with sequence <= `through_sequence`, returning the removed entries.
745    /// Returns `Fenced` if the consumer's epoch does not match the manifest's epoch.
746    pub(crate) async fn dequeue(&self, through_sequence: u64) -> Result<Vec<QueueEntry>> {
747        loop {
748            let (mut manifest, version) = self.read_manifest().await?;
749            if manifest.epoch != self.epoch.load(Ordering::Relaxed) {
750                return Err(Error::Fenced);
751            }
752            let removed = manifest.dequeue(through_sequence)?;
753            match self.write_manifest(&manifest, version).await {
754                Ok(()) => return Ok(removed),
755                Err(ManifestWriteError::Conflict) => {
756                    self.counter.record_conflict();
757                    continue;
758                }
759                Err(ManifestWriteError::Fatal(e)) => return Err(e),
760            }
761        }
762    }
763
764    /// Return the number of entries in the queue as of the last manifest read or write.
765    pub fn len(&self) -> usize {
766        self.queue_len.load(Ordering::Relaxed) as usize
767    }
768
769    async fn read_manifest(&self) -> Result<(Manifest, Option<UpdateVersion>)> {
770        let result = self.manifest_store.read().await?;
771        self.queue_len
772            .store(result.0.entries_count() as u64, Ordering::Relaxed);
773        Ok(result)
774    }
775
776    async fn write_manifest(
777        &self,
778        manifest: &Manifest,
779        version: Option<UpdateVersion>,
780    ) -> std::result::Result<(), ManifestWriteError> {
781        self.counter.record_write();
782        let result = self.manifest_store.write(manifest, version).await;
783        if result.is_ok() {
784            self.queue_len
785                .store(manifest.entries_count() as u64, Ordering::Relaxed);
786        }
787        result
788    }
789
790    /// Return the percentage of manifest writes that encountered a conflict.
791    pub fn conflict_rate(&self) -> f64 {
792        self.counter.conflict_rate()
793    }
794}
795
796/// A single entry in the manifest.
797#[derive(Debug, Clone, PartialEq)]
798pub struct ManifestEntry {
799    pub sequence: u64,
800    pub location: String,
801    pub metadata: Vec<Metadata>,
802}
803
804/// Read-only view of a parsed buffer queue manifest.
805#[derive(Debug, Clone)]
806pub struct ManifestView {
807    pub epoch: u64,
808    pub next_sequence: u64,
809    entries: Vec<ManifestEntry>,
810}
811
812impl ManifestView {
813    /// Return all entries in the manifest.
814    pub fn entries(&self) -> &[ManifestEntry] {
815        &self.entries
816    }
817}
818
819/// Parse a manifest from its binary representation.
820pub fn parse_manifest(data: Bytes) -> Result<ManifestView> {
821    let manifest = Manifest::from_bytes(data)?;
822    let entries = manifest
823        .iter()
824        .map(|r| {
825            r.map(|e| ManifestEntry {
826                sequence: e.sequence,
827                location: e.location,
828                metadata: e.metadata,
829            })
830        })
831        .collect::<Result<Vec<_>>>()?;
832    Ok(ManifestView {
833        epoch: manifest.epoch,
834        next_sequence: manifest.next_sequence,
835        entries,
836    })
837}
838
839#[cfg(test)]
840mod tests {
841    use super::*;
842    use slatedb::object_store::memory::InMemory;
843
844    const TEST_MANIFEST_PATH: &str = "test/manifest";
845
846    async fn read_producer_manifest(store: &Arc<dyn ObjectStore>, path: &str) -> Manifest {
847        let path = Path::from(path);
848        let result = store.get(&path).await.unwrap();
849        let bytes = result.bytes().await.unwrap();
850        Manifest::from_bytes(bytes).unwrap()
851    }
852
853    #[tokio::test]
854    async fn should_initialize_consumer_and_increment_epoch() {
855        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
856        let consumer =
857            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
858
859        consumer.initialize().await.unwrap();
860
861        let manifest = read_producer_manifest(&store, TEST_MANIFEST_PATH).await;
862        assert_eq!(manifest.epoch, 1);
863    }
864
865    #[tokio::test]
866    async fn should_peek_none_when_queue_is_empty() {
867        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
868        let consumer =
869            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
870        consumer.initialize().await.unwrap();
871
872        let result = consumer.peek().await.unwrap();
873        assert!(result.is_none());
874    }
875
876    #[tokio::test]
877    async fn should_read_entry_by_sequence() {
878        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
879        let producer =
880            QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
881
882        producer
883            .enqueue("a.batch".to_string(), vec![])
884            .await
885            .unwrap();
886        producer
887            .enqueue("b.batch".to_string(), vec![])
888            .await
889            .unwrap();
890        producer
891            .enqueue("c.batch".to_string(), vec![])
892            .await
893            .unwrap();
894
895        let consumer =
896            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
897        consumer.initialize().await.unwrap();
898
899        let entry = consumer.read(1).await.unwrap().unwrap();
900        assert_eq!(entry.location, "b.batch");
901        assert_eq!(entry.sequence, 1);
902    }
903
904    #[tokio::test]
905    async fn should_read_none_for_missing_sequence() {
906        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
907        let producer =
908            QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
909
910        producer
911            .enqueue("a.batch".to_string(), vec![])
912            .await
913            .unwrap();
914
915        let consumer =
916            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
917        consumer.initialize().await.unwrap();
918
919        let result = consumer.read(99).await.unwrap();
920        assert!(result.is_none());
921    }
922
923    #[tokio::test]
924    async fn should_fence_old_consumer_on_peek() {
925        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
926        let consumer_a =
927            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
928        consumer_a.initialize().await.unwrap();
929
930        let consumer_b =
931            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
932        consumer_b.initialize().await.unwrap();
933
934        let result = consumer_a.peek().await;
935        assert!(matches!(result, Err(Error::Fenced)));
936    }
937
938    #[tokio::test]
939    async fn should_fence_old_consumer_on_dequeue() {
940        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
941        let consumer_a =
942            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
943        consumer_a.initialize().await.unwrap();
944
945        let consumer_b =
946            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
947        consumer_b.initialize().await.unwrap();
948
949        let result = consumer_a.dequeue(0).await;
950        assert!(matches!(result, Err(Error::Fenced)));
951    }
952
953    #[tokio::test]
954    async fn should_fence_uninitialized_consumer() {
955        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
956        let producer =
957            QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
958
959        producer
960            .enqueue("a.batch".to_string(), vec![])
961            .await
962            .unwrap();
963
964        let consumer =
965            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
966
967        let result = consumer.peek().await;
968        assert!(matches!(result, Err(Error::Fenced)));
969    }
970
971    #[tokio::test]
972    async fn should_wrap_epoch_to_zero_at_max() {
973        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
974
975        let mut manifest = Manifest::empty();
976        manifest.set_epoch(u64::MAX - 1);
977        let path = Path::from(TEST_MANIFEST_PATH);
978        store
979            .put(
980                &path,
981                PutPayload::from(manifest.to_bytes().unwrap().to_vec()),
982            )
983            .await
984            .unwrap();
985
986        let consumer =
987            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
988        consumer.initialize().await.unwrap();
989
990        let manifest = read_producer_manifest(&store, TEST_MANIFEST_PATH).await;
991        assert_eq!(manifest.epoch, 0);
992    }
993
994    #[tokio::test]
995    async fn should_peek_first_entry_with_valid_epoch() {
996        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
997        let producer =
998            QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
999
1000        producer
1001            .enqueue("a.batch".to_string(), vec![])
1002            .await
1003            .unwrap();
1004        producer
1005            .enqueue("b.batch".to_string(), vec![])
1006            .await
1007            .unwrap();
1008
1009        let consumer =
1010            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1011        consumer.initialize().await.unwrap();
1012
1013        let entry = consumer.peek().await.unwrap().unwrap();
1014        assert_eq!(entry.location, "a.batch");
1015    }
1016
1017    #[tokio::test]
1018    async fn should_dequeue_entries_with_valid_epoch() {
1019        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1020        let producer =
1021            QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1022
1023        producer
1024            .enqueue("a.batch".to_string(), vec![])
1025            .await
1026            .unwrap();
1027        producer
1028            .enqueue("b.batch".to_string(), vec![])
1029            .await
1030            .unwrap();
1031        producer
1032            .enqueue("c.batch".to_string(), vec![])
1033            .await
1034            .unwrap();
1035
1036        let consumer =
1037            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1038        consumer.initialize().await.unwrap();
1039
1040        let removed = consumer.dequeue(1).await.unwrap();
1041        assert_eq!(removed.len(), 2);
1042        assert_eq!(removed[0].location, "a.batch");
1043        assert_eq!(removed[1].location, "b.batch");
1044
1045        let next = consumer.peek().await.unwrap().unwrap();
1046        assert_eq!(next.location, "c.batch");
1047    }
1048
1049    #[tokio::test]
1050    async fn should_enqueue_after_consumer_dequeue() {
1051        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1052        let producer =
1053            QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1054
1055        producer
1056            .enqueue("a.batch".to_string(), vec![])
1057            .await
1058            .unwrap();
1059        producer
1060            .enqueue("b.batch".to_string(), vec![])
1061            .await
1062            .unwrap();
1063
1064        let consumer =
1065            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1066        consumer.initialize().await.unwrap();
1067
1068        consumer.dequeue(1).await.unwrap();
1069
1070        producer
1071            .enqueue("c.batch".to_string(), vec![])
1072            .await
1073            .unwrap();
1074
1075        let next = consumer.peek().await.unwrap().unwrap();
1076        assert_eq!(next.location, "c.batch");
1077        assert_eq!(next.sequence, 2);
1078    }
1079
1080    #[tokio::test]
1081    async fn should_enqueue_locations_to_manifest() {
1082        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1083        let producer =
1084            QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1085
1086        producer
1087            .enqueue("path/to/file1.batch".to_string(), vec![])
1088            .await
1089            .unwrap();
1090        producer
1091            .enqueue("path/to/file2.batch".to_string(), vec![])
1092            .await
1093            .unwrap();
1094
1095        let manifest = read_producer_manifest(&store, TEST_MANIFEST_PATH).await;
1096        let locations: Vec<String> = manifest.iter().map(|e| e.unwrap().location).collect();
1097        assert_eq!(
1098            locations,
1099            vec!["path/to/file1.batch", "path/to/file2.batch"]
1100        );
1101    }
1102
1103    #[tokio::test]
1104    async fn should_merge_with_existing_manifest() {
1105        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1106
1107        let existing = Manifest::from_entries(&[QueueEntry {
1108            sequence: 0,
1109            location: "existing/file.batch".to_string(),
1110            metadata: vec![],
1111        }]);
1112        let path = Path::from(TEST_MANIFEST_PATH);
1113        store
1114            .put(
1115                &path,
1116                PutPayload::from(existing.to_bytes().unwrap().to_vec()),
1117            )
1118            .await
1119            .unwrap();
1120
1121        let producer =
1122            QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1123        producer
1124            .enqueue("new/file.batch".to_string(), vec![])
1125            .await
1126            .unwrap();
1127
1128        let manifest = read_producer_manifest(&store, "test/manifest").await;
1129        let locations: Vec<String> = manifest.iter().map(|e| e.unwrap().location).collect();
1130        assert_eq!(locations, vec!["existing/file.batch", "new/file.batch"]);
1131    }
1132
1133    fn entry(location: &str, metadata: Vec<Metadata>) -> QueueEntry {
1134        QueueEntry::new(location.to_string(), metadata).unwrap()
1135    }
1136
1137    fn entry_seq(seq: u64, location: &str, metadata: Vec<Metadata>) -> QueueEntry {
1138        QueueEntry {
1139            sequence: seq,
1140            location: location.to_string(),
1141            metadata,
1142        }
1143    }
1144
1145    fn meta(start_index: u32, time_ms: i64, data: &str) -> Metadata {
1146        Metadata {
1147            start_index,
1148            ingestion_time_ms: time_ms,
1149            payload: Bytes::from(data.to_string()),
1150        }
1151    }
1152
1153    fn collect_locations(manifest: &Manifest) -> Vec<String> {
1154        manifest.iter().map(|e| e.unwrap().location).collect()
1155    }
1156
1157    #[test]
1158    fn should_create_empty_manifest() {
1159        let m = Manifest::empty();
1160
1161        assert_eq!(m.entries_count(), 0);
1162        assert!(m.is_empty());
1163        assert_eq!(m.epoch, 0);
1164
1165        let bytes = m.to_bytes().unwrap();
1166        assert_eq!(bytes.len(), FOOTER_SIZE);
1167        assert_eq!(u32::from_le_bytes(bytes[0..4].try_into().unwrap()), 0);
1168        assert_eq!(u64::from_le_bytes(bytes[4..12].try_into().unwrap()), 0);
1169        assert_eq!(u64::from_le_bytes(bytes[12..20].try_into().unwrap()), 0);
1170        assert_eq!(
1171            u16::from_le_bytes(bytes[20..22].try_into().unwrap()),
1172            MANIFEST_VERSION
1173        );
1174    }
1175
1176    #[test]
1177    fn should_parse_valid_manifest_bytes() {
1178        let entries = vec![
1179            entry_seq(0, "a", vec![meta(0, 1, "x")]),
1180            entry_seq(1, "b", vec![meta(0, 2, "y")]),
1181        ];
1182        let data = Manifest::from_entries(&entries).to_bytes().unwrap();
1183
1184        let m = Manifest::from_bytes(data).unwrap();
1185
1186        assert_eq!(m.entries_count(), 2);
1187    }
1188
1189    #[test]
1190    fn should_parse_footer_only_bytes() {
1191        let mut buf = BytesMut::with_capacity(FOOTER_SIZE);
1192        buf.put_u32_le(0);
1193        buf.put_u64_le(42);
1194        buf.put_u64_le(0);
1195        buf.put_u16_le(MANIFEST_VERSION);
1196
1197        let m = Manifest::from_bytes(buf.freeze()).unwrap();
1198
1199        assert_eq!(m.entries_count(), 0);
1200        assert_eq!(m.epoch, 0);
1201
1202        let mut m = m;
1203        m.append(&entry("loc", vec![])).unwrap();
1204        let entries: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1205        assert_eq!(entries[0].sequence, 42);
1206    }
1207
1208    #[test]
1209    fn should_reject_empty_bytes() {
1210        let err = Manifest::from_bytes(Bytes::new()).unwrap_err();
1211
1212        assert!(err.to_string().contains("must not be empty"));
1213    }
1214
1215    #[test]
1216    fn should_reject_bytes_too_short_for_footer() {
1217        let err = Manifest::from_bytes(Bytes::from_static(&[0; 21])).unwrap_err();
1218
1219        assert!(err.to_string().contains("too short for footer"));
1220    }
1221
1222    #[test]
1223    fn should_reject_wrong_version() {
1224        let mut buf = BytesMut::with_capacity(FOOTER_SIZE);
1225        buf.put_u32_le(0);
1226        buf.put_u64_le(0);
1227        buf.put_u64_le(0);
1228        buf.put_u16_le(99);
1229
1230        let err = Manifest::from_bytes(buf.freeze()).unwrap_err();
1231
1232        assert!(err.to_string().contains("unsupported"));
1233        assert!(err.to_string().contains("99"));
1234    }
1235
1236    #[test]
1237    fn should_reject_version_zero() {
1238        let mut buf = BytesMut::with_capacity(FOOTER_SIZE);
1239        buf.put_u32_le(0);
1240        buf.put_u64_le(0);
1241        buf.put_u64_le(0);
1242        buf.put_u16_le(0);
1243
1244        let err = Manifest::from_bytes(buf.freeze()).unwrap_err();
1245
1246        assert!(err.to_string().contains("unsupported"));
1247    }
1248
1249    #[test]
1250    fn should_make_appended_entry_accessible_via_iter() {
1251        let mut m = Manifest::empty();
1252
1253        m.append(&entry("loc", vec![meta(0, 42, "meta")])).unwrap();
1254
1255        let entries: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1256        assert_eq!(entries.len(), 1);
1257        assert_eq!(entries[0].sequence, 0);
1258        assert_eq!(entries[0].location, "loc");
1259        assert_eq!(entries[0].metadata, vec![meta(0, 42, "meta")]);
1260    }
1261
1262    #[test]
1263    fn should_append_to_existing_base_entries() {
1264        let base = Manifest::from_entries(&[entry_seq(0, "base", vec![])]);
1265        let data = base.to_bytes().unwrap();
1266        let mut m = Manifest::from_bytes(data).unwrap();
1267
1268        m.append(&entry("appended", vec![])).unwrap();
1269
1270        assert_eq!(m.entries_count(), 2);
1271        assert_eq!(collect_locations(&m), vec!["base", "appended"]);
1272        let entries: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1273        assert_eq!(entries[0].sequence, 0);
1274        assert_eq!(entries[1].sequence, 1);
1275    }
1276
1277    #[test]
1278    fn should_preserve_append_order() {
1279        let mut m = Manifest::empty();
1280
1281        m.append(&entry("a", vec![])).unwrap();
1282        m.append(&entry("b", vec![])).unwrap();
1283        m.append(&entry("c", vec![])).unwrap();
1284
1285        assert_eq!(collect_locations(&m), vec!["a", "b", "c"]);
1286        let entries: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1287        assert_eq!(entries[0].sequence, 0);
1288        assert_eq!(entries[1].sequence, 1);
1289        assert_eq!(entries[2].sequence, 2);
1290    }
1291
1292    #[test]
1293    fn should_handle_entry_with_empty_location() {
1294        let m = Manifest::from_entries(&[entry_seq(0, "", vec![])]);
1295
1296        let decoded: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1297        assert_eq!(decoded[0].location, "");
1298        assert!(decoded[0].metadata.is_empty());
1299    }
1300
1301    #[test]
1302    fn should_handle_entry_with_large_metadata() {
1303        let big_meta = Bytes::from(vec![0xAB_u8; 1024]);
1304
1305        let m = Manifest::from_entries(&[entry_seq(
1306            0,
1307            "loc",
1308            vec![Metadata {
1309                start_index: 0,
1310                ingestion_time_ms: 1,
1311                payload: big_meta.clone(),
1312            }],
1313        )]);
1314
1315        let decoded: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1316        assert_eq!(decoded[0].metadata.len(), 1);
1317        assert_eq!(decoded[0].metadata[0].payload, big_meta);
1318    }
1319
1320    #[test]
1321    fn should_handle_negative_ingestion_time() {
1322        let m = Manifest::from_entries(&[entry_seq(0, "loc", vec![meta(0, -1000, "")])]);
1323
1324        let decoded: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1325        assert_eq!(decoded[0].metadata[0].ingestion_time_ms, -1000);
1326    }
1327
1328    #[test]
1329    fn should_return_footer_for_empty_manifest() {
1330        let m = Manifest::empty();
1331
1332        let bytes = m.to_bytes().unwrap();
1333
1334        assert_eq!(bytes.len(), FOOTER_SIZE);
1335        assert_eq!(u32::from_le_bytes(bytes[0..4].try_into().unwrap()), 0);
1336        assert_eq!(u64::from_le_bytes(bytes[4..12].try_into().unwrap()), 0);
1337        assert_eq!(u64::from_le_bytes(bytes[12..20].try_into().unwrap()), 0);
1338        assert_eq!(
1339            u16::from_le_bytes(bytes[20..22].try_into().unwrap()),
1340            MANIFEST_VERSION
1341        );
1342    }
1343
1344    #[test]
1345    fn should_merge_base_and_appended() {
1346        let base = Manifest::from_entries(&[entry_seq(0, "base", vec![])]);
1347        let mut m = Manifest::from_bytes(base.to_bytes().unwrap()).unwrap();
1348        m.append(&entry("appended", vec![])).unwrap();
1349
1350        let serialized = m.to_bytes().unwrap();
1351        let reparsed = Manifest::from_bytes(serialized).unwrap();
1352
1353        assert_eq!(reparsed.entries_count(), 2);
1354        assert_eq!(collect_locations(&reparsed), vec!["base", "appended"]);
1355        let entries: Vec<QueueEntry> = reparsed.iter().map(|e| e.unwrap()).collect();
1356        assert_eq!(entries[0].sequence, 0);
1357        assert_eq!(entries[1].sequence, 1);
1358    }
1359
1360    #[test]
1361    fn should_write_correct_footer_count() {
1362        let base = Manifest::from_entries(&[entry_seq(0, "a", vec![]), entry_seq(1, "b", vec![])]);
1363        let mut m = Manifest::from_bytes(base.to_bytes().unwrap()).unwrap();
1364        m.append(&entry("c", vec![])).unwrap();
1365        m.append(&entry("d", vec![])).unwrap();
1366        m.append(&entry("e", vec![])).unwrap();
1367
1368        let bytes = m.to_bytes().unwrap();
1369
1370        let footer_start = bytes.len() - FOOTER_SIZE;
1371        let count = u32::from_le_bytes(bytes[footer_start..footer_start + 4].try_into().unwrap());
1372        let next_seq = u64::from_le_bytes(
1373            bytes[footer_start + 4..footer_start + 12]
1374                .try_into()
1375                .unwrap(),
1376        );
1377        let epoch = u64::from_le_bytes(
1378            bytes[footer_start + 12..footer_start + 20]
1379                .try_into()
1380                .unwrap(),
1381        );
1382        let version = u16::from_le_bytes(bytes[footer_start + 20..].try_into().unwrap());
1383        assert_eq!(count, 5);
1384        assert_eq!(next_seq, 5);
1385        assert_eq!(epoch, 0);
1386        assert_eq!(version, MANIFEST_VERSION);
1387    }
1388
1389    #[test]
1390    fn should_round_trip_from_entries_to_bytes_from_bytes() {
1391        let entries = vec![
1392            entry_seq(0, "a", vec![meta(0, 10, "m1")]),
1393            entry_seq(1, "b", vec![meta(0, 20, "m2")]),
1394        ];
1395        let original = Manifest::from_entries(&entries);
1396
1397        let reparsed = Manifest::from_bytes(original.to_bytes().unwrap()).unwrap();
1398
1399        assert_eq!(reparsed.entries_count(), 2);
1400        let decoded: Vec<QueueEntry> = reparsed.iter().map(|e| e.unwrap()).collect();
1401        assert_eq!(decoded[0].sequence, 0);
1402        assert_eq!(decoded[0].location, "a");
1403        assert_eq!(decoded[0].metadata, vec![meta(0, 10, "m1")]);
1404        assert_eq!(decoded[1].sequence, 1);
1405        assert_eq!(decoded[1].location, "b");
1406        assert_eq!(decoded[1].metadata, vec![meta(0, 20, "m2")]);
1407    }
1408
1409    #[test]
1410    fn should_round_trip_append_serialize_reparse() {
1411        let mut m = Manifest::empty();
1412        m.append(&entry("x", vec![meta(0, 100, "data")])).unwrap();
1413        m.append(&entry("y", vec![meta(0, 200, "more")])).unwrap();
1414
1415        let reparsed = Manifest::from_bytes(m.to_bytes().unwrap()).unwrap();
1416
1417        assert_eq!(reparsed.entries_count(), 2);
1418        assert_eq!(collect_locations(&reparsed), vec!["x", "y"]);
1419    }
1420
1421    #[test]
1422    fn should_chain_serialize_reparse_append() {
1423        let original = Manifest::from_entries(&[entry_seq(0, "a", vec![])]);
1424        let mut m = Manifest::from_bytes(original.to_bytes().unwrap()).unwrap();
1425        m.append(&entry("b", vec![])).unwrap();
1426
1427        let mut m2 = Manifest::from_bytes(m.to_bytes().unwrap()).unwrap();
1428        m2.append(&entry("c", vec![])).unwrap();
1429
1430        let final_m = Manifest::from_bytes(m2.to_bytes().unwrap()).unwrap();
1431
1432        assert_eq!(final_m.entries_count(), 3);
1433        assert_eq!(collect_locations(&final_m), vec!["a", "b", "c"]);
1434        let entries: Vec<QueueEntry> = final_m.iter().map(|e| e.unwrap()).collect();
1435        assert_eq!(entries[2].sequence, 2);
1436    }
1437
1438    #[test]
1439    fn should_dequeue_entries_through_sequence() {
1440        let mut m = Manifest::empty();
1441        for _ in 0..5 {
1442            m.append(&entry("loc", vec![])).unwrap();
1443        }
1444
1445        let removed = m.dequeue(2).unwrap();
1446
1447        assert_eq!(removed.len(), 3);
1448        assert_eq!(removed[0].sequence, 0);
1449        assert_eq!(removed[1].sequence, 1);
1450        assert_eq!(removed[2].sequence, 2);
1451        assert_eq!(m.entries_count(), 2);
1452        let remaining: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1453        assert_eq!(remaining[0].sequence, 3);
1454        assert_eq!(remaining[1].sequence, 4);
1455        assert_eq!(m.next_sequence, 5);
1456    }
1457
1458    #[test]
1459    fn should_dequeue_all_entries() {
1460        let mut m = Manifest::empty();
1461        for _ in 0..3 {
1462            m.append(&entry("loc", vec![])).unwrap();
1463        }
1464
1465        let removed = m.dequeue(2).unwrap();
1466
1467        assert_eq!(removed.len(), 3);
1468        assert!(m.is_empty());
1469        assert_eq!(m.next_sequence, 3);
1470    }
1471
1472    #[test]
1473    fn should_dequeue_nothing_when_sequence_below_first() {
1474        let entries = vec![
1475            entry_seq(5, "a", vec![]),
1476            entry_seq(6, "b", vec![]),
1477            entry_seq(7, "c", vec![]),
1478        ];
1479        let mut m = Manifest::from_entries(&entries);
1480
1481        let removed = m.dequeue(3).unwrap();
1482
1483        assert!(removed.is_empty());
1484        assert_eq!(m.entries_count(), 3);
1485    }
1486
1487    #[test]
1488    fn should_append_after_dequeue() {
1489        let mut m = Manifest::empty();
1490        for _ in 0..3 {
1491            m.append(&entry("loc", vec![])).unwrap();
1492        }
1493
1494        m.dequeue(0).unwrap();
1495
1496        assert_eq!(m.entries_count(), 2);
1497        let remaining: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1498        assert_eq!(remaining[0].sequence, 1);
1499        assert_eq!(remaining[1].sequence, 2);
1500
1501        m.append(&entry("new", vec![])).unwrap();
1502        let all: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1503        assert_eq!(all.len(), 3);
1504        assert_eq!(all[2].sequence, 3);
1505    }
1506
1507    /// Serialize a single entry into raw bytes (without footer).
1508    fn encode_entry_bytes(entry: &QueueEntry) -> Vec<u8> {
1509        let mut buf = BytesMut::new();
1510        Manifest::encode_entry(&mut buf, entry).unwrap();
1511        buf.to_vec()
1512    }
1513
1514    /// Wrap raw entry bytes with a manifest footer (entry_count=1) so that
1515    /// `Manifest::from_bytes` + `.iter()` exercises `decode_entry`.
1516    fn manifest_from_raw_entry(entry_bytes: &[u8]) -> Manifest {
1517        let mut buf = BytesMut::new();
1518        buf.extend_from_slice(entry_bytes);
1519        buf.put_u32_le(1); // entry_count
1520        buf.put_u64_le(1); // next_sequence
1521        buf.put_u64_le(0); // epoch
1522        buf.put_u16_le(MANIFEST_VERSION);
1523        Manifest::from_bytes(buf.freeze()).unwrap()
1524    }
1525
1526    /// Offset of metadata_count inside the encoded entry (for location "a").
1527    /// Layout: entry_len(4) + sequence(8) + location_len(2) + location(1)
1528    const METADATA_COUNT_OFFSET: usize = ENTRY_LEN_SIZE + SEQUENCE_SIZE + LOCATION_LEN_SIZE + 1;
1529
1530    /// Build an entry with no metadata, then corrupt metadata_count to `count`
1531    /// and extend the buffer with `extra_bytes` after metadata_count to simulate
1532    /// partial metadata. Also patches entry_len to match the new total size.
1533    fn corrupt_metadata_entry(count: u32, extra_bytes: &[u8]) -> Vec<u8> {
1534        let e = QueueEntry {
1535            sequence: 1,
1536            location: "a".to_string(),
1537            metadata: vec![],
1538        };
1539        let mut raw = encode_entry_bytes(&e);
1540        // Overwrite metadata_count
1541        raw[METADATA_COUNT_OFFSET..METADATA_COUNT_OFFSET + 4].copy_from_slice(&count.to_le_bytes());
1542        // Append extra bytes (partial metadata fields)
1543        raw.extend_from_slice(extra_bytes);
1544        // Patch entry_len to cover the full buffer after the 4-byte prefix
1545        let new_entry_len = (raw.len() - ENTRY_LEN_SIZE) as u32;
1546        raw[..ENTRY_LEN_SIZE].copy_from_slice(&new_entry_len.to_le_bytes());
1547        raw
1548    }
1549
1550    #[test]
1551    fn should_reject_trailing_bytes_before_footer() {
1552        // Build a valid entry, then add garbage bytes before the footer.
1553        let mut raw = encode_entry_bytes(&entry_seq(0, "loc", vec![]));
1554        raw.extend_from_slice(&[0xFFu8; 5]); // trailing garbage
1555
1556        let manifest = manifest_from_raw_entry(&raw);
1557        let items: Vec<Result<QueueEntry>> = manifest.iter().collect();
1558        assert_eq!(items.len(), 2);
1559        assert!(items[0].is_ok());
1560        let err = items[1].as_ref().unwrap_err();
1561        assert!(
1562            err.to_string().contains("did not consume all bytes"),
1563            "got: {}",
1564            err
1565        );
1566    }
1567
1568    #[test]
1569    fn should_reject_entry_with_entry_len_below_minimum() {
1570        // entry_len too small: less than SEQUENCE_SIZE + LOCATION_LEN_SIZE + METADATA_COUNT_SIZE (14)
1571        let bad_entry_len = (SEQUENCE_SIZE + LOCATION_LEN_SIZE + METADATA_COUNT_SIZE - 1) as u32;
1572        let mut raw = Vec::new();
1573        raw.extend_from_slice(&bad_entry_len.to_le_bytes());
1574        raw.extend_from_slice(&[0u8; 13]); // enough raw bytes to not truncate
1575
1576        let manifest = manifest_from_raw_entry(&raw);
1577        let err = manifest.iter().next().unwrap().unwrap_err();
1578        assert!(err.to_string().contains(
1579            "entry length 13 is less than minimum entry length 14 for the length of the location 0"
1580        ));
1581    }
1582
1583    #[test]
1584    fn should_reject_entry_with_entry_len_below_minimum_for_location() {
1585        let location = "abc";
1586        // entry_len covers fixed fields but not the full location
1587        let bad_entry_len =
1588            (SEQUENCE_SIZE + LOCATION_LEN_SIZE + METADATA_COUNT_SIZE + location.len() - 1) as u32;
1589        let mut raw = Vec::new();
1590        raw.extend_from_slice(&bad_entry_len.to_le_bytes());
1591        raw.extend_from_slice(&0u64.to_le_bytes()); // sequence
1592        raw.extend_from_slice(&(location.len() as u16).to_le_bytes()); // location_len
1593        raw.extend_from_slice(&[0u8; 20]); // padding so entry doesn't extend beyond data
1594
1595        let manifest = manifest_from_raw_entry(&raw);
1596        let err = manifest.iter().next().unwrap().unwrap_err();
1597        assert!(
1598            err.to_string()
1599                .contains("entry length 16 is less than minimum entry length 17"),
1600            "got: {}",
1601            err
1602        );
1603    }
1604
1605    #[test]
1606    fn should_reject_entry_truncated_before_entry_len() {
1607        // Data too short to even read entry_len (need 4 bytes, provide 2)
1608        let manifest = manifest_from_raw_entry(&[0u8; 2]);
1609        let err = manifest.iter().next().unwrap().unwrap_err();
1610        assert!(
1611            matches!(&err, Error::Serialization(msg) if msg.contains("entry length field does not fit")),
1612            "unexpected error: {err}"
1613        );
1614    }
1615
1616    #[test]
1617    fn should_reject_entry_truncated_before_metadata_start_index() {
1618        // metadata_count = 1 but no metadata bytes at all
1619        let manifest = manifest_from_raw_entry(&corrupt_metadata_entry(1, &[]));
1620        let err = manifest.iter().next().unwrap().unwrap_err();
1621        assert!(
1622            matches!(&err, Error::Serialization(msg) if msg.contains("start index field does not fit")),
1623            "unexpected error: {err}"
1624        );
1625    }
1626
1627    #[test]
1628    fn should_reject_entry_truncated_before_metadata_ingestion_time() {
1629        // metadata_count = 1, only start_index present (4 bytes)
1630        let manifest = manifest_from_raw_entry(&corrupt_metadata_entry(1, &0u32.to_le_bytes()));
1631        let err = manifest.iter().next().unwrap().unwrap_err();
1632        assert!(
1633            matches!(&err, Error::Serialization(msg) if msg.contains("ingestion time field does not fit")),
1634            "unexpected error: {err}"
1635        );
1636    }
1637
1638    #[test]
1639    fn should_reject_entry_truncated_before_metadata_length() {
1640        // metadata_count = 1, start_index + ingestion_time present, but no m_len
1641        let mut extra = Vec::new();
1642        extra.extend_from_slice(&0u32.to_le_bytes()); // start_index
1643        extra.extend_from_slice(&0i64.to_le_bytes()); // ingestion_time_ms
1644        let manifest = manifest_from_raw_entry(&corrupt_metadata_entry(1, &extra));
1645        let err = manifest.iter().next().unwrap().unwrap_err();
1646        assert!(
1647            matches!(&err, Error::Serialization(msg) if msg.contains("metadata length field does not fit")),
1648            "unexpected error: {err}"
1649        );
1650    }
1651
1652    #[test]
1653    fn should_reject_entry_truncated_before_metadata_payload() {
1654        // metadata_count = 1, all fixed fields present, m_len says 10 but only 2 bytes follow
1655        let mut extra = Vec::new();
1656        extra.extend_from_slice(&0u32.to_le_bytes()); // start_index
1657        extra.extend_from_slice(&0i64.to_le_bytes()); // ingestion_time_ms
1658        extra.extend_from_slice(&10u32.to_le_bytes()); // m_len = 10
1659        extra.extend_from_slice(&[0xAB, 0xCD]); // only 2 payload bytes
1660        let manifest = manifest_from_raw_entry(&corrupt_metadata_entry(1, &extra));
1661        let err = manifest.iter().next().unwrap().unwrap_err();
1662        assert!(
1663            matches!(&err, Error::Serialization(msg) if msg.contains("metadata has less bytes than set")),
1664            "unexpected error: {err}"
1665        );
1666    }
1667
1668    #[tokio::test]
1669    async fn should_reject_location_exceeding_u16_max() {
1670        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1671        let producer =
1672            QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1673
1674        let long_location = "x".repeat(u16::MAX as usize + 1);
1675        let result = producer.enqueue(long_location, vec![]).await;
1676        assert!(matches!(result, Err(Error::InvalidInput(msg)) if msg.contains("location length")));
1677    }
1678}