Skip to main content

buffer/
queue.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicU64, Ordering};
3
4use bytes::{BufMut, Bytes, BytesMut};
5use slatedb::object_store::path::Path;
6use slatedb::object_store::{
7    Error as ObjectStoreError, ObjectStore, PutMode, PutPayload, UpdateVersion,
8};
9
10use crate::error::{Error, Result};
11
12const MANIFEST_VERSION: u16 = 1;
13const UNINITIALIZED_EPOCH: u64 = u64::MAX;
14const ENTRY_LEN_SIZE: usize = 4;
15const LOCATION_LEN_SIZE: usize = 2;
16const INGESTION_TIME_MS_SIZE: usize = 8;
17const METADATA_LEN_SIZE: usize = 4;
18const START_INDEX_SIZE: usize = 4;
19const METADATA_COUNT_SIZE: usize = 4;
20const ENTRIES_COUNT_SIZE: usize = 4;
21const SEQUENCE_SIZE: usize = 8;
22const EPOCH_SIZE: usize = 8;
23const VERSION_SIZE: usize = 2;
24const FOOTER_SIZE: usize = ENTRIES_COUNT_SIZE + SEQUENCE_SIZE + EPOCH_SIZE + VERSION_SIZE;
25
26/// Per-range metadata attached to a batch by the buffer.
27#[derive(Debug, Clone, PartialEq)]
28pub struct Metadata {
29    /// Index of the first entry in the batch that this metadata range covers.
30    pub start_index: u32,
31    /// Wall-clock ingestion time in milliseconds since the Unix epoch.
32    pub ingestion_time_ms: i64,
33    /// Opaque metadata payload supplied by the caller of [`Producer::produce`](crate::Producer::produce).
34    pub payload: Bytes,
35}
36
37#[derive(Debug, Clone)]
38pub(crate) struct QueueEntry {
39    pub(crate) sequence: u64,
40    pub(crate) location: String,
41    pub(crate) metadata: Vec<Metadata>,
42}
43
44impl QueueEntry {
45    fn new(location: String, metadata: Vec<Metadata>) -> Result<Self> {
46        if location.len() > u16::MAX as usize {
47            return Err(Error::InvalidInput(format!(
48                "location length {} exceeds u16::MAX",
49                location.len()
50            )));
51        }
52        if metadata.len() > u32::MAX as usize {
53            return Err(Error::InvalidInput(format!(
54                "metadata count {} exceeds u32::MAX",
55                metadata.len()
56            )));
57        }
58        Ok(Self {
59            sequence: 0,
60            location,
61            metadata,
62        })
63    }
64
65    fn clone_with_sequence(&self, sequence: u64) -> Self {
66        Self {
67            sequence,
68            ..self.clone()
69        }
70    }
71}
72
73#[derive(Debug, Clone)]
74pub(crate) struct Manifest {
75    data: Bytes,
76    appended: BytesMut,
77    appended_count: usize,
78    next_sequence: u64,
79    epoch: u64,
80}
81
82impl Manifest {
83    /// Create an empty manifest with a valid footer.
84    fn empty() -> Self {
85        let mut buf = BytesMut::with_capacity(FOOTER_SIZE);
86        buf.put_u32_le(0);
87        buf.put_u64_le(0);
88        buf.put_u64_le(0);
89        buf.put_u16_le(MANIFEST_VERSION);
90        Self {
91            data: buf.freeze(),
92            appended: BytesMut::new(),
93            appended_count: 0,
94            next_sequence: 0,
95            epoch: 0,
96        }
97    }
98
99    /// Wrap raw binary data as a queue manifest, validating the footer.
100    pub(crate) fn from_bytes(data: Bytes) -> Result<Self> {
101        if data.is_empty() {
102            return Err(Error::Serialization(
103                "queue manifest data must not be empty".to_string(),
104            ));
105        }
106        if data.len() < FOOTER_SIZE {
107            return Err(Error::Serialization(
108                "queue manifest too short for footer".to_string(),
109            ));
110        }
111        let version_start = data.len() - VERSION_SIZE;
112        let version = u16::from_le_bytes(data[version_start..].try_into().unwrap());
113        if version != MANIFEST_VERSION {
114            return Err(Error::Serialization(format!(
115                "unsupported queue manifest version: {}",
116                version
117            )));
118        }
119        let epoch_start = data.len() - VERSION_SIZE - EPOCH_SIZE;
120        let epoch = u64::from_le_bytes(
121            data[epoch_start..epoch_start + EPOCH_SIZE]
122                .try_into()
123                .unwrap(),
124        );
125        let next_seq_start = data.len() - VERSION_SIZE - EPOCH_SIZE - SEQUENCE_SIZE;
126        let next_sequence = u64::from_le_bytes(
127            data[next_seq_start..next_seq_start + SEQUENCE_SIZE]
128                .try_into()
129                .unwrap(),
130        );
131        Ok(Self {
132            data,
133            appended: BytesMut::new(),
134            appended_count: 0,
135            next_sequence,
136            epoch,
137        })
138    }
139
140    /// Build a manifest from a slice of entries.
141    #[cfg(test)]
142    fn from_entries(entries: &[QueueEntry]) -> Self {
143        let next_sequence = entries.iter().map(|e| e.sequence + 1).max().unwrap_or(0);
144        let mut buf = BytesMut::new();
145        for entry in entries {
146            Self::encode_entry(&mut buf, entry).unwrap();
147        }
148        buf.put_u32_le(entries.len() as u32);
149        buf.put_u64_le(next_sequence);
150        buf.put_u64_le(0);
151        buf.put_u16_le(MANIFEST_VERSION);
152        Self {
153            data: buf.freeze(),
154            appended: BytesMut::new(),
155            appended_count: 0,
156            next_sequence,
157            epoch: 0,
158        }
159    }
160
161    /// Number of entries (read from the footer, O(1)).
162    fn entries_count(&self) -> usize {
163        let base = self.existing_entries_count();
164        base + self.appended_count
165    }
166
167    /// Whether the manifest contains no entries.
168    #[cfg(test)]
169    fn is_empty(&self) -> bool {
170        self.entries_count() == 0
171    }
172
173    /// Return a borrowing iterator that lazily deserializes entries.
174    pub(crate) fn iter(&self) -> ManifestIter<'_> {
175        let base_count = self.existing_entries_count();
176        let entries_end = if self.data.is_empty() {
177            0
178        } else {
179            self.data.len() - FOOTER_SIZE
180        };
181        ManifestIter {
182            data: &self.data,
183            offset: 0,
184            remaining: base_count,
185            entries_end,
186            appended: &self.appended,
187            appended_offset: 0,
188            appended_remaining: self.appended_count,
189        }
190    }
191
192    fn existing_entries_count(&self) -> usize {
193        if self.data.is_empty() {
194            0
195        } else {
196            let footer_start = self.data.len() - FOOTER_SIZE;
197            u32::from_le_bytes(
198                self.data[footer_start..footer_start + ENTRIES_COUNT_SIZE]
199                    .try_into()
200                    .unwrap(),
201            ) as usize
202        }
203    }
204
205    /// Append a single entry without copying existing data.
206    /// The entry is encoded and stored internally; bytes are merged in `to_bytes()`.
207    /// The entry's sequence number is overwritten with the manifest's next sequence.
208    fn append(&mut self, entry: &QueueEntry) -> Result<()> {
209        let sequenced = entry.clone_with_sequence(self.next_sequence);
210        Self::encode_entry(&mut self.appended, &sequenced)?;
211        self.next_sequence += 1;
212        self.appended_count += 1;
213        Ok(())
214    }
215
216    /// Remove all entries with sequence <= `through_sequence`, returning them.
217    ///
218    /// Optimized to avoid deserializing/re-serializing remaining entries: only the
219    /// removed entries are fully decoded, while remaining entries are byte-copied.
220    fn dequeue(&mut self, through_sequence: u64) -> Result<Vec<QueueEntry>> {
221        let next_seq = self.next_sequence;
222        let epoch = self.epoch;
223
224        let base_count = self.existing_entries_count();
225        let entries_end = if self.data.is_empty() {
226            0
227        } else {
228            self.data.len() - FOOTER_SIZE
229        };
230
231        let (mut removed, remaining_base_start, remaining_base_count) =
232            split_entries(&self.data, base_count, entries_end, through_sequence)?;
233
234        let appended_end = self.appended.len();
235        let (appended_removed, remaining_appended_start, remaining_appended_count) = split_entries(
236            &self.appended,
237            self.appended_count,
238            appended_end,
239            through_sequence,
240        )?;
241        removed.extend(appended_removed);
242
243        let remaining_base_bytes = &self.data[remaining_base_start..entries_end];
244        let remaining_appended_bytes = &self.appended[remaining_appended_start..appended_end];
245        let total_remaining = remaining_base_count + remaining_appended_count;
246
247        let mut buf = BytesMut::with_capacity(
248            remaining_base_bytes.len() + remaining_appended_bytes.len() + FOOTER_SIZE,
249        );
250        buf.extend_from_slice(remaining_base_bytes);
251        buf.extend_from_slice(remaining_appended_bytes);
252        buf.put_u32_le(total_remaining);
253        buf.put_u64_le(next_seq);
254        buf.put_u64_le(epoch);
255        buf.put_u16_le(MANIFEST_VERSION);
256
257        self.data = buf.freeze();
258        self.appended = BytesMut::new();
259        self.appended_count = 0;
260        self.next_sequence = next_seq;
261        self.epoch = epoch;
262
263        Ok(removed)
264    }
265
266    /// Set the epoch and patch the data bytes in place.
267    fn set_epoch(&mut self, epoch: u64) {
268        self.epoch = epoch;
269        let mut buf = BytesMut::from(self.data.as_ref());
270        let epoch_start = buf.len() - VERSION_SIZE - EPOCH_SIZE;
271        buf[epoch_start..epoch_start + EPOCH_SIZE].copy_from_slice(&epoch.to_le_bytes());
272        self.data = buf.freeze();
273    }
274
275    /// Serialize the manifest to bytes for writing to object storage.
276    /// When an entry was appended, this merges existing data with the appended
277    /// entry and writes a new footer.
278    fn to_bytes(&self) -> Result<Bytes> {
279        if self.appended.is_empty() {
280            return Ok(self.data.clone());
281        }
282        let (prefix, base_count) = if self.data.is_empty() {
283            (&[] as &[u8], 0u32)
284        } else {
285            let footer_start = self.data.len() - FOOTER_SIZE;
286            let count = u32::from_le_bytes(
287                self.data[footer_start..footer_start + ENTRIES_COUNT_SIZE]
288                    .try_into()
289                    .unwrap(),
290            );
291            (&self.data[..footer_start], count)
292        };
293        let total_count: u32 = base_count
294            .checked_add(self.appended_count as u32)
295            .ok_or_else(|| {
296                Error::Serialization(format!(
297                    "total entry count consisting of {} existing entries + {} appended entries exceeds u32::MAX",
298                    base_count, self.appended_count
299                ))
300            })?;
301        let mut buf = BytesMut::with_capacity(prefix.len() + self.appended.len() + FOOTER_SIZE);
302        buf.extend_from_slice(prefix);
303        buf.extend_from_slice(&self.appended);
304        buf.put_u32_le(total_count);
305        buf.put_u64_le(self.next_sequence);
306        buf.put_u64_le(self.epoch);
307        buf.put_u16_le(MANIFEST_VERSION);
308        Ok(buf.freeze())
309    }
310
311    fn encode_entry(buf: &mut BytesMut, entry: &QueueEntry) -> Result<()> {
312        debug_assert!(entry.location.len() <= u16::MAX as usize);
313        let metadata_size: usize = METADATA_COUNT_SIZE
314            + entry
315                .metadata
316                .iter()
317                .map(|m| {
318                    START_INDEX_SIZE + INGESTION_TIME_MS_SIZE + METADATA_LEN_SIZE + m.payload.len()
319                })
320                .sum::<usize>();
321        let entry_body_len =
322            SEQUENCE_SIZE + LOCATION_LEN_SIZE + entry.location.len() + metadata_size;
323        debug_assert!(entry_body_len <= u32::MAX as usize);
324        buf.put_u32_le(entry_body_len as u32);
325        buf.put_u64_le(entry.sequence);
326        buf.put_u16_le(entry.location.len() as u16);
327        buf.extend_from_slice(entry.location.as_bytes());
328        debug_assert!(entry.metadata.len() <= u32::MAX as usize);
329        buf.put_u32_le(entry.metadata.len() as u32);
330        for m in &entry.metadata {
331            if m.payload.len() > u32::MAX as usize {
332                return Err(Error::InvalidInput(format!(
333                    "metadata payload size {} exceeds u32::MAX",
334                    m.payload.len()
335                )));
336            }
337            buf.put_u32_le(m.start_index);
338            buf.put_i64_le(m.ingestion_time_ms);
339            buf.put_u32_le(m.payload.len() as u32);
340            buf.extend_from_slice(&m.payload);
341        }
342        Ok(())
343    }
344}
345
346/// Walk entries in `data[0..end]`, splitting at `through_sequence`.
347/// Entries with sequence <= through_sequence are fully decoded and returned.
348/// Returns (removed_entries, remaining_start_offset, remaining_count).
349fn split_entries(
350    data: &[u8],
351    count: usize,
352    end: usize,
353    through_sequence: u64,
354) -> Result<(Vec<QueueEntry>, usize, u32)> {
355    let mut removed = Vec::new();
356    let mut offset = 0usize;
357
358    for i in 0..count {
359        let entry_start = offset;
360        let entry = decode_entry(data, &mut offset, end)?;
361
362        if entry.sequence <= through_sequence {
363            removed.push(entry);
364        } else {
365            return Ok((removed, entry_start, (count - i) as u32));
366        }
367    }
368
369    Ok((removed, end, 0))
370}
371
372/// Decode a single entry from binary data at the given offset.
373fn decode_entry(data: &[u8], offset: &mut usize, end: usize) -> Result<QueueEntry> {
374    if *offset + ENTRY_LEN_SIZE > end {
375        return Err(Error::Serialization(
376            "queue entry corrupt: size of entry length field does not fit in entry".to_string(),
377        ));
378    }
379
380    let entry_len =
381        u32::from_le_bytes(data[*offset..*offset + ENTRY_LEN_SIZE].try_into().unwrap()) as usize;
382    *offset += ENTRY_LEN_SIZE;
383
384    if *offset + entry_len > end {
385        return Err(Error::Serialization(
386            "queue entry corrupt: entry has less bytes than set in the entry length".to_string(),
387        ));
388    }
389
390    let entry_end = *offset + entry_len;
391
392    let sequence = u64::from_le_bytes(data[*offset..*offset + SEQUENCE_SIZE].try_into().unwrap());
393    *offset += SEQUENCE_SIZE;
394
395    let location_len = u16::from_le_bytes(
396        data[*offset..*offset + LOCATION_LEN_SIZE]
397            .try_into()
398            .unwrap(),
399    ) as usize;
400    *offset += LOCATION_LEN_SIZE;
401
402    let min_entry_len = SEQUENCE_SIZE + LOCATION_LEN_SIZE + location_len + METADATA_COUNT_SIZE;
403    if entry_len < min_entry_len {
404        return Err(Error::Serialization(format!(
405            "queue entry corrupt: entry length {} is less than minimum entry length {} for the length of the location {}",
406            entry_len, min_entry_len, location_len
407        )));
408    }
409
410    let location = String::from_utf8(data[*offset..*offset + location_len].to_vec())
411        .map_err(|e| Error::Serialization(e.to_string()))?;
412    *offset += location_len;
413
414    let metadata_count = u32::from_le_bytes(
415        data[*offset..*offset + METADATA_COUNT_SIZE]
416            .try_into()
417            .unwrap(),
418    ) as usize;
419    *offset += METADATA_COUNT_SIZE;
420
421    let mut metadata = Vec::with_capacity(metadata_count);
422    for _ in 0..metadata_count {
423        if *offset + START_INDEX_SIZE > end {
424            return Err(Error::Serialization(
425                "queue entry corrupt: size of start index field does not fit in entry".to_string(),
426            ));
427        }
428        let start_index = u32::from_le_bytes(
429            data[*offset..*offset + START_INDEX_SIZE]
430                .try_into()
431                .unwrap(),
432        );
433        *offset += START_INDEX_SIZE;
434
435        if *offset + INGESTION_TIME_MS_SIZE > end {
436            return Err(Error::Serialization(
437                "queue entry corrupt: size of ingestion time field does not fit in entry"
438                    .to_string(),
439            ));
440        }
441        let ingestion_time_ms = i64::from_le_bytes(
442            data[*offset..*offset + INGESTION_TIME_MS_SIZE]
443                .try_into()
444                .unwrap(),
445        );
446        *offset += INGESTION_TIME_MS_SIZE;
447
448        if *offset + METADATA_LEN_SIZE > end {
449            return Err(Error::Serialization(
450                "queue entry corrupt: size of metadata length field does not fit in entry"
451                    .to_string(),
452            ));
453        }
454        let m_len = u32::from_le_bytes(
455            data[*offset..*offset + METADATA_LEN_SIZE]
456                .try_into()
457                .unwrap(),
458        ) as usize;
459        *offset += METADATA_LEN_SIZE;
460
461        if *offset + m_len > end {
462            return Err(Error::Serialization(
463                "queue entry corrupt: metadata has less bytes than set in the metadata length"
464                    .to_string(),
465            ));
466        }
467        metadata.push(Metadata {
468            start_index,
469            ingestion_time_ms,
470            payload: Bytes::copy_from_slice(&data[*offset..*offset + m_len]),
471        });
472        *offset += m_len;
473    }
474
475    *offset = entry_end;
476
477    Ok(QueueEntry {
478        sequence,
479        location,
480        metadata,
481    })
482}
483
484/// Borrowing iterator over manifest entries. Lazily deserializes each entry.
485pub(crate) struct ManifestIter<'a> {
486    data: &'a [u8],
487    offset: usize,
488    remaining: usize,
489    entries_end: usize,
490    appended: &'a [u8],
491    appended_offset: usize,
492    appended_remaining: usize,
493}
494
495impl Iterator for ManifestIter<'_> {
496    type Item = Result<QueueEntry>;
497
498    fn next(&mut self) -> Option<Self::Item> {
499        if self.remaining > 0 {
500            self.remaining -= 1;
501            Some(decode_entry(self.data, &mut self.offset, self.entries_end))
502        } else if self.appended_remaining > 0 {
503            self.appended_remaining -= 1;
504            Some(decode_entry(
505                self.appended,
506                &mut self.appended_offset,
507                self.appended.len(),
508            ))
509        } else if self.offset != self.entries_end {
510            let err = Some(Err(Error::Serialization(format!(
511                "base entries did not consume all bytes: offset {} != entries_end {}",
512                self.offset, self.entries_end
513            ))));
514            self.offset = self.entries_end;
515            err
516        } else {
517            None
518        }
519    }
520}
521
522enum ManifestWriteError {
523    Conflict,
524    Fatal(Error),
525}
526
527#[derive(Clone)]
528pub(crate) struct ManifestStore {
529    pub(crate) object_store: Arc<dyn ObjectStore>,
530    pub(crate) manifest_path: String,
531}
532
533impl ManifestStore {
534    pub(crate) async fn read(&self) -> Result<(Manifest, Option<UpdateVersion>)> {
535        let path = Path::from(self.manifest_path.as_str());
536        match self.object_store.get(&path).await {
537            Ok(result) => {
538                let version = UpdateVersion {
539                    e_tag: result.meta.e_tag.clone(),
540                    version: result.meta.version.clone(),
541                };
542                let bytes = result
543                    .bytes()
544                    .await
545                    .map_err(|e| Error::Storage(e.to_string()))?;
546                let manifest = Manifest::from_bytes(bytes)?;
547                Ok((manifest, Some(version)))
548            }
549            Err(ObjectStoreError::NotFound { .. }) => Ok((Manifest::empty(), None)),
550            Err(e) => Err(Error::Storage(e.to_string())),
551        }
552    }
553
554    async fn write(
555        &self,
556        manifest: &Manifest,
557        version: Option<UpdateVersion>,
558    ) -> std::result::Result<(), ManifestWriteError> {
559        let path = Path::from(self.manifest_path.as_str());
560        let put_mode = match version {
561            Some(v) => PutMode::Update(v),
562            None => PutMode::Create,
563        };
564        let data = manifest.to_bytes().map_err(ManifestWriteError::Fatal)?;
565
566        match self
567            .object_store
568            .put_opts(&path, PutPayload::from(data.to_vec()), put_mode.into())
569            .await
570        {
571            Ok(_) => Ok(()),
572            Err(ObjectStoreError::Precondition { .. })
573            | Err(ObjectStoreError::AlreadyExists { .. }) => Err(ManifestWriteError::Conflict),
574            Err(e) => Err(ManifestWriteError::Fatal(Error::Storage(e.to_string()))),
575        }
576    }
577}
578
579struct ConflictCounter {
580    write_count: AtomicU64,
581    conflict_count: AtomicU64,
582    role: &'static str,
583}
584
585impl ConflictCounter {
586    fn new(role: &'static str) -> Self {
587        Self {
588            write_count: AtomicU64::new(0),
589            conflict_count: AtomicU64::new(0),
590            role,
591        }
592    }
593
594    fn record_write(&self) {
595        self.write_count.fetch_add(1, Ordering::Relaxed);
596        metrics::counter!(crate::metric_names::MANIFEST_WRITES, "role" => self.role).increment(1);
597    }
598
599    fn record_conflict(&self) {
600        self.conflict_count.fetch_add(1, Ordering::Relaxed);
601        metrics::counter!(crate::metric_names::MANIFEST_CONFLICTS, "role" => self.role)
602            .increment(1);
603    }
604
605    fn conflict_rate(&self) -> f64 {
606        let writes = self.write_count.load(Ordering::Relaxed);
607        if writes == 0 {
608            return 0.0;
609        }
610        let conflicts = self.conflict_count.load(Ordering::Relaxed);
611        let rate = (conflicts as f64 / writes as f64) * 100.0;
612        rate.min(100.0)
613    }
614}
615
616/// A producer that appends entries to a shared manifest in object storage.
617///
618/// Writes use optimistic concurrency: the manifest is read, modified locally,
619/// and written back with a conditional put. On conflict the operation is
620/// retried automatically until it succeeds.
621pub struct QueueProducer {
622    manifest_store: ManifestStore,
623    counter: ConflictCounter,
624}
625
626impl QueueProducer {
627    /// Create a new producer backed by the given [`ObjectStore`].
628    pub fn with_object_store(manifest_path: String, object_store: Arc<dyn ObjectStore>) -> Self {
629        Self {
630            manifest_store: ManifestStore {
631                object_store,
632                manifest_path,
633            },
634            counter: ConflictCounter::new("producer"),
635        }
636    }
637
638    /// Append an entry to the queue with the given `location` and `metadata`.
639    ///
640    /// Returns [`Error::InvalidInput`] if `location` exceeds 65 535 bytes or
641    /// `metadata` exceeds 2³²−1 items. The write is retried automatically on
642    /// optimistic-concurrency conflicts.
643    pub async fn enqueue(&self, location: String, metadata: Vec<Metadata>) -> Result<()> {
644        let entry = QueueEntry::new(location, metadata)?;
645        loop {
646            let (mut manifest, version) = self.manifest_store.read().await?;
647            manifest.append(&entry)?;
648            self.counter.record_write();
649            match self.manifest_store.write(&manifest, version).await {
650                Ok(()) => return Ok(()),
651                Err(ManifestWriteError::Conflict) => {
652                    self.counter.record_conflict();
653                    continue;
654                }
655                Err(ManifestWriteError::Fatal(e)) => return Err(e),
656            }
657        }
658    }
659
660    /// Return the percentage of manifest writes that encountered a conflict.
661    pub fn conflict_rate(&self) -> f64 {
662        self.counter.conflict_rate()
663    }
664}
665
666/// A consumer that reads and dequeues entries from a shared manifest in
667/// object storage.
668///
669/// Single-consumer semantics are enforced through an epoch stored in the
670/// manifest. Calling [`QueueConsumer::initialize`] increments the epoch,
671/// fencing any previous consumer instance. Every subsequent read or dequeue
672/// checks that the local epoch still matches the manifest, returning
673/// [`Error::Fenced`] if another consumer has taken over.
674pub struct QueueConsumer {
675    manifest_store: ManifestStore,
676    epoch: AtomicU64,
677    counter: ConflictCounter,
678    queue_len: AtomicU64,
679}
680
681impl QueueConsumer {
682    /// Create a new consumer backed by the given [`ObjectStore`].
683    ///
684    /// The consumer is not active until [`QueueConsumer::initialize`] is called.
685    pub fn with_object_store(manifest_path: String, object_store: Arc<dyn ObjectStore>) -> Self {
686        Self {
687            manifest_store: ManifestStore {
688                object_store,
689                manifest_path,
690            },
691            epoch: AtomicU64::new(UNINITIALIZED_EPOCH),
692            counter: ConflictCounter::new("consumer"),
693            queue_len: AtomicU64::new(0),
694        }
695    }
696
697    /// Initialize the consumer by incrementing the epoch in the queue manifest.
698    /// This fences any previous consumer that was using the old epoch.
699    pub async fn initialize(&self) -> Result<()> {
700        loop {
701            let (mut manifest, version) = self.read_manifest().await?;
702            let mut new_epoch = manifest.epoch.wrapping_add(1);
703            if new_epoch == UNINITIALIZED_EPOCH {
704                new_epoch = new_epoch.wrapping_add(1);
705            }
706            manifest.set_epoch(new_epoch);
707            match self.write_manifest(&manifest, version).await {
708                Ok(()) => {
709                    self.epoch.store(new_epoch, Ordering::Relaxed);
710                    return Ok(());
711                }
712                Err(ManifestWriteError::Conflict) => {
713                    self.counter.record_conflict();
714                    continue;
715                }
716                Err(ManifestWriteError::Fatal(e)) => return Err(e),
717            }
718        }
719    }
720
721    /// Return the first entry in the queue without dequeueing it.
722    /// Returns `Fenced` if the consumer's epoch does not match the manifest's epoch.
723    ///
724    /// Superseded by [`QueueConsumer::descriptors_after`] for the
725    /// `Consumer::next_batch` path; retained for tests and future
726    /// callers that need a single-entry peek.
727    #[allow(dead_code)]
728    pub(crate) async fn peek(&self) -> Result<Option<QueueEntry>> {
729        let (manifest, _) = self.read_manifest().await?;
730        if manifest.epoch != self.epoch.load(Ordering::Relaxed) {
731            return Err(Error::Fenced);
732        }
733        manifest.iter().next().transpose()
734    }
735
736    /// Return up to `max` contiguous queue entries with sequence strictly
737    /// greater than `after_sequence` (or all entries when `after_sequence`
738    /// is `None`), in manifest order.
739    ///
740    /// Reads the manifest once. Does not mutate it. Returns `Fenced` if
741    /// the consumer's epoch does not match the manifest's epoch. This is
742    /// the read-ahead primitive `Consumer::next_descriptors` builds on
743    /// (RFC 0003).
744    pub(crate) async fn descriptors_after(
745        &self,
746        after_sequence: Option<u64>,
747        max: usize,
748    ) -> Result<Vec<QueueEntry>> {
749        if max == 0 {
750            return Ok(Vec::new());
751        }
752        let (manifest, _) = self.read_manifest().await?;
753        if manifest.epoch != self.epoch.load(Ordering::Relaxed) {
754            return Err(Error::Fenced);
755        }
756        let mut out = Vec::with_capacity(max);
757        for entry in manifest.iter() {
758            let entry = entry?;
759            if let Some(after) = after_sequence
760                && entry.sequence <= after
761            {
762                continue;
763            }
764            out.push(entry);
765            if out.len() >= max {
766                break;
767            }
768        }
769        Ok(out)
770    }
771
772    /// Return the entry with the given sequence number, or None if not found.
773    /// Returns `Fenced` if the consumer's epoch does not match the manifest's epoch.
774    ///
775    /// Superseded by [`QueueConsumer::descriptors_after`] for bulk
776    /// reads; retained for legacy single-sequence lookups.
777    #[allow(dead_code)]
778    pub(crate) async fn read(&self, sequence: u64) -> Result<Option<QueueEntry>> {
779        let (manifest, _) = self.read_manifest().await?;
780        if manifest.epoch != self.epoch.load(Ordering::Relaxed) {
781            return Err(Error::Fenced);
782        }
783        manifest
784            .iter()
785            .find(|e| matches!(e, Ok(e) if e.sequence == sequence))
786            .transpose()
787    }
788
789    /// Remove all entries with sequence <= `through_sequence`, returning the removed entries.
790    /// Returns `Fenced` if the consumer's epoch does not match the manifest's epoch.
791    pub(crate) async fn dequeue(&self, through_sequence: u64) -> Result<Vec<QueueEntry>> {
792        loop {
793            let (mut manifest, version) = self.read_manifest().await?;
794            if manifest.epoch != self.epoch.load(Ordering::Relaxed) {
795                return Err(Error::Fenced);
796            }
797            let removed = manifest.dequeue(through_sequence)?;
798            match self.write_manifest(&manifest, version).await {
799                Ok(()) => return Ok(removed),
800                Err(ManifestWriteError::Conflict) => {
801                    self.counter.record_conflict();
802                    continue;
803                }
804                Err(ManifestWriteError::Fatal(e)) => return Err(e),
805            }
806        }
807    }
808
809    /// Return the number of entries in the queue as of the last manifest read or write.
810    pub fn len(&self) -> usize {
811        self.queue_len.load(Ordering::Relaxed) as usize
812    }
813
814    async fn read_manifest(&self) -> Result<(Manifest, Option<UpdateVersion>)> {
815        let result = self.manifest_store.read().await?;
816        self.queue_len
817            .store(result.0.entries_count() as u64, Ordering::Relaxed);
818        Ok(result)
819    }
820
821    async fn write_manifest(
822        &self,
823        manifest: &Manifest,
824        version: Option<UpdateVersion>,
825    ) -> std::result::Result<(), ManifestWriteError> {
826        self.counter.record_write();
827        let result = self.manifest_store.write(manifest, version).await;
828        if result.is_ok() {
829            self.queue_len
830                .store(manifest.entries_count() as u64, Ordering::Relaxed);
831        }
832        result
833    }
834
835    /// Return the percentage of manifest writes that encountered a conflict.
836    pub fn conflict_rate(&self) -> f64 {
837        self.counter.conflict_rate()
838    }
839}
840
841/// A single entry in the manifest.
842#[derive(Debug, Clone, PartialEq)]
843pub struct ManifestEntry {
844    pub sequence: u64,
845    pub location: String,
846    pub metadata: Vec<Metadata>,
847}
848
849/// Read-only view of a parsed buffer queue manifest.
850#[derive(Debug, Clone)]
851pub struct ManifestView {
852    pub epoch: u64,
853    pub next_sequence: u64,
854    entries: Vec<ManifestEntry>,
855}
856
857impl ManifestView {
858    /// Return all entries in the manifest.
859    pub fn entries(&self) -> &[ManifestEntry] {
860        &self.entries
861    }
862}
863
864/// Parse a manifest from its binary representation.
865pub fn parse_manifest(data: Bytes) -> Result<ManifestView> {
866    let manifest = Manifest::from_bytes(data)?;
867    let entries = manifest
868        .iter()
869        .map(|r| {
870            r.map(|e| ManifestEntry {
871                sequence: e.sequence,
872                location: e.location,
873                metadata: e.metadata,
874            })
875        })
876        .collect::<Result<Vec<_>>>()?;
877    Ok(ManifestView {
878        epoch: manifest.epoch,
879        next_sequence: manifest.next_sequence,
880        entries,
881    })
882}
883
884#[cfg(test)]
885mod tests {
886    use super::*;
887    use slatedb::object_store::memory::InMemory;
888
889    const TEST_MANIFEST_PATH: &str = "test/manifest";
890
891    async fn read_producer_manifest(store: &Arc<dyn ObjectStore>, path: &str) -> Manifest {
892        let path = Path::from(path);
893        let result = store.get(&path).await.unwrap();
894        let bytes = result.bytes().await.unwrap();
895        Manifest::from_bytes(bytes).unwrap()
896    }
897
898    #[tokio::test]
899    async fn should_initialize_consumer_and_increment_epoch() {
900        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
901        let consumer =
902            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
903
904        consumer.initialize().await.unwrap();
905
906        let manifest = read_producer_manifest(&store, TEST_MANIFEST_PATH).await;
907        assert_eq!(manifest.epoch, 1);
908    }
909
910    #[tokio::test]
911    async fn should_peek_none_when_queue_is_empty() {
912        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
913        let consumer =
914            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
915        consumer.initialize().await.unwrap();
916
917        let result = consumer.peek().await.unwrap();
918        assert!(result.is_none());
919    }
920
921    #[tokio::test]
922    async fn should_read_entry_by_sequence() {
923        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
924        let producer =
925            QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
926
927        producer
928            .enqueue("a.batch".to_string(), vec![])
929            .await
930            .unwrap();
931        producer
932            .enqueue("b.batch".to_string(), vec![])
933            .await
934            .unwrap();
935        producer
936            .enqueue("c.batch".to_string(), vec![])
937            .await
938            .unwrap();
939
940        let consumer =
941            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
942        consumer.initialize().await.unwrap();
943
944        let entry = consumer.read(1).await.unwrap().unwrap();
945        assert_eq!(entry.location, "b.batch");
946        assert_eq!(entry.sequence, 1);
947    }
948
949    #[tokio::test]
950    async fn should_read_none_for_missing_sequence() {
951        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
952        let producer =
953            QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
954
955        producer
956            .enqueue("a.batch".to_string(), vec![])
957            .await
958            .unwrap();
959
960        let consumer =
961            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
962        consumer.initialize().await.unwrap();
963
964        let result = consumer.read(99).await.unwrap();
965        assert!(result.is_none());
966    }
967
968    #[tokio::test]
969    async fn should_fence_old_consumer_on_peek() {
970        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
971        let consumer_a =
972            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
973        consumer_a.initialize().await.unwrap();
974
975        let consumer_b =
976            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
977        consumer_b.initialize().await.unwrap();
978
979        let result = consumer_a.peek().await;
980        assert!(matches!(result, Err(Error::Fenced)));
981    }
982
983    #[tokio::test]
984    async fn should_fence_old_consumer_on_dequeue() {
985        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
986        let consumer_a =
987            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
988        consumer_a.initialize().await.unwrap();
989
990        let consumer_b =
991            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
992        consumer_b.initialize().await.unwrap();
993
994        let result = consumer_a.dequeue(0).await;
995        assert!(matches!(result, Err(Error::Fenced)));
996    }
997
998    #[tokio::test]
999    async fn should_fence_uninitialized_consumer() {
1000        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1001        let producer =
1002            QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1003
1004        producer
1005            .enqueue("a.batch".to_string(), vec![])
1006            .await
1007            .unwrap();
1008
1009        let consumer =
1010            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1011
1012        let result = consumer.peek().await;
1013        assert!(matches!(result, Err(Error::Fenced)));
1014    }
1015
1016    #[tokio::test]
1017    async fn should_wrap_epoch_to_zero_at_max() {
1018        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1019
1020        let mut manifest = Manifest::empty();
1021        manifest.set_epoch(u64::MAX - 1);
1022        let path = Path::from(TEST_MANIFEST_PATH);
1023        store
1024            .put(
1025                &path,
1026                PutPayload::from(manifest.to_bytes().unwrap().to_vec()),
1027            )
1028            .await
1029            .unwrap();
1030
1031        let consumer =
1032            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1033        consumer.initialize().await.unwrap();
1034
1035        let manifest = read_producer_manifest(&store, TEST_MANIFEST_PATH).await;
1036        assert_eq!(manifest.epoch, 0);
1037    }
1038
1039    #[tokio::test]
1040    async fn should_peek_first_entry_with_valid_epoch() {
1041        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1042        let producer =
1043            QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1044
1045        producer
1046            .enqueue("a.batch".to_string(), vec![])
1047            .await
1048            .unwrap();
1049        producer
1050            .enqueue("b.batch".to_string(), vec![])
1051            .await
1052            .unwrap();
1053
1054        let consumer =
1055            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1056        consumer.initialize().await.unwrap();
1057
1058        let entry = consumer.peek().await.unwrap().unwrap();
1059        assert_eq!(entry.location, "a.batch");
1060    }
1061
1062    #[tokio::test]
1063    async fn should_dequeue_entries_with_valid_epoch() {
1064        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1065        let producer =
1066            QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1067
1068        producer
1069            .enqueue("a.batch".to_string(), vec![])
1070            .await
1071            .unwrap();
1072        producer
1073            .enqueue("b.batch".to_string(), vec![])
1074            .await
1075            .unwrap();
1076        producer
1077            .enqueue("c.batch".to_string(), vec![])
1078            .await
1079            .unwrap();
1080
1081        let consumer =
1082            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1083        consumer.initialize().await.unwrap();
1084
1085        let removed = consumer.dequeue(1).await.unwrap();
1086        assert_eq!(removed.len(), 2);
1087        assert_eq!(removed[0].location, "a.batch");
1088        assert_eq!(removed[1].location, "b.batch");
1089
1090        let next = consumer.peek().await.unwrap().unwrap();
1091        assert_eq!(next.location, "c.batch");
1092    }
1093
1094    #[tokio::test]
1095    async fn should_enqueue_after_consumer_dequeue() {
1096        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1097        let producer =
1098            QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1099
1100        producer
1101            .enqueue("a.batch".to_string(), vec![])
1102            .await
1103            .unwrap();
1104        producer
1105            .enqueue("b.batch".to_string(), vec![])
1106            .await
1107            .unwrap();
1108
1109        let consumer =
1110            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1111        consumer.initialize().await.unwrap();
1112
1113        consumer.dequeue(1).await.unwrap();
1114
1115        producer
1116            .enqueue("c.batch".to_string(), vec![])
1117            .await
1118            .unwrap();
1119
1120        let next = consumer.peek().await.unwrap().unwrap();
1121        assert_eq!(next.location, "c.batch");
1122        assert_eq!(next.sequence, 2);
1123    }
1124
1125    #[tokio::test]
1126    async fn should_enqueue_locations_to_manifest() {
1127        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1128        let producer =
1129            QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1130
1131        producer
1132            .enqueue("path/to/file1.batch".to_string(), vec![])
1133            .await
1134            .unwrap();
1135        producer
1136            .enqueue("path/to/file2.batch".to_string(), vec![])
1137            .await
1138            .unwrap();
1139
1140        let manifest = read_producer_manifest(&store, TEST_MANIFEST_PATH).await;
1141        let locations: Vec<String> = manifest.iter().map(|e| e.unwrap().location).collect();
1142        assert_eq!(
1143            locations,
1144            vec!["path/to/file1.batch", "path/to/file2.batch"]
1145        );
1146    }
1147
1148    #[tokio::test]
1149    async fn should_merge_with_existing_manifest() {
1150        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1151
1152        let existing = Manifest::from_entries(&[QueueEntry {
1153            sequence: 0,
1154            location: "existing/file.batch".to_string(),
1155            metadata: vec![],
1156        }]);
1157        let path = Path::from(TEST_MANIFEST_PATH);
1158        store
1159            .put(
1160                &path,
1161                PutPayload::from(existing.to_bytes().unwrap().to_vec()),
1162            )
1163            .await
1164            .unwrap();
1165
1166        let producer =
1167            QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1168        producer
1169            .enqueue("new/file.batch".to_string(), vec![])
1170            .await
1171            .unwrap();
1172
1173        let manifest = read_producer_manifest(&store, "test/manifest").await;
1174        let locations: Vec<String> = manifest.iter().map(|e| e.unwrap().location).collect();
1175        assert_eq!(locations, vec!["existing/file.batch", "new/file.batch"]);
1176    }
1177
1178    fn entry(location: &str, metadata: Vec<Metadata>) -> QueueEntry {
1179        QueueEntry::new(location.to_string(), metadata).unwrap()
1180    }
1181
1182    fn entry_seq(seq: u64, location: &str, metadata: Vec<Metadata>) -> QueueEntry {
1183        QueueEntry {
1184            sequence: seq,
1185            location: location.to_string(),
1186            metadata,
1187        }
1188    }
1189
1190    fn meta(start_index: u32, time_ms: i64, data: &str) -> Metadata {
1191        Metadata {
1192            start_index,
1193            ingestion_time_ms: time_ms,
1194            payload: Bytes::from(data.to_string()),
1195        }
1196    }
1197
1198    fn collect_locations(manifest: &Manifest) -> Vec<String> {
1199        manifest.iter().map(|e| e.unwrap().location).collect()
1200    }
1201
1202    #[test]
1203    fn should_create_empty_manifest() {
1204        let m = Manifest::empty();
1205
1206        assert_eq!(m.entries_count(), 0);
1207        assert!(m.is_empty());
1208        assert_eq!(m.epoch, 0);
1209
1210        let bytes = m.to_bytes().unwrap();
1211        assert_eq!(bytes.len(), FOOTER_SIZE);
1212        assert_eq!(u32::from_le_bytes(bytes[0..4].try_into().unwrap()), 0);
1213        assert_eq!(u64::from_le_bytes(bytes[4..12].try_into().unwrap()), 0);
1214        assert_eq!(u64::from_le_bytes(bytes[12..20].try_into().unwrap()), 0);
1215        assert_eq!(
1216            u16::from_le_bytes(bytes[20..22].try_into().unwrap()),
1217            MANIFEST_VERSION
1218        );
1219    }
1220
1221    #[test]
1222    fn should_parse_valid_manifest_bytes() {
1223        let entries = vec![
1224            entry_seq(0, "a", vec![meta(0, 1, "x")]),
1225            entry_seq(1, "b", vec![meta(0, 2, "y")]),
1226        ];
1227        let data = Manifest::from_entries(&entries).to_bytes().unwrap();
1228
1229        let m = Manifest::from_bytes(data).unwrap();
1230
1231        assert_eq!(m.entries_count(), 2);
1232    }
1233
1234    #[test]
1235    fn should_parse_footer_only_bytes() {
1236        let mut buf = BytesMut::with_capacity(FOOTER_SIZE);
1237        buf.put_u32_le(0);
1238        buf.put_u64_le(42);
1239        buf.put_u64_le(0);
1240        buf.put_u16_le(MANIFEST_VERSION);
1241
1242        let m = Manifest::from_bytes(buf.freeze()).unwrap();
1243
1244        assert_eq!(m.entries_count(), 0);
1245        assert_eq!(m.epoch, 0);
1246
1247        let mut m = m;
1248        m.append(&entry("loc", vec![])).unwrap();
1249        let entries: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1250        assert_eq!(entries[0].sequence, 42);
1251    }
1252
1253    #[test]
1254    fn should_reject_empty_bytes() {
1255        let err = Manifest::from_bytes(Bytes::new()).unwrap_err();
1256
1257        assert!(err.to_string().contains("must not be empty"));
1258    }
1259
1260    #[test]
1261    fn should_reject_bytes_too_short_for_footer() {
1262        let err = Manifest::from_bytes(Bytes::from_static(&[0; 21])).unwrap_err();
1263
1264        assert!(err.to_string().contains("too short for footer"));
1265    }
1266
1267    #[test]
1268    fn should_reject_wrong_version() {
1269        let mut buf = BytesMut::with_capacity(FOOTER_SIZE);
1270        buf.put_u32_le(0);
1271        buf.put_u64_le(0);
1272        buf.put_u64_le(0);
1273        buf.put_u16_le(99);
1274
1275        let err = Manifest::from_bytes(buf.freeze()).unwrap_err();
1276
1277        assert!(err.to_string().contains("unsupported"));
1278        assert!(err.to_string().contains("99"));
1279    }
1280
1281    #[test]
1282    fn should_reject_version_zero() {
1283        let mut buf = BytesMut::with_capacity(FOOTER_SIZE);
1284        buf.put_u32_le(0);
1285        buf.put_u64_le(0);
1286        buf.put_u64_le(0);
1287        buf.put_u16_le(0);
1288
1289        let err = Manifest::from_bytes(buf.freeze()).unwrap_err();
1290
1291        assert!(err.to_string().contains("unsupported"));
1292    }
1293
1294    #[test]
1295    fn should_make_appended_entry_accessible_via_iter() {
1296        let mut m = Manifest::empty();
1297
1298        m.append(&entry("loc", vec![meta(0, 42, "meta")])).unwrap();
1299
1300        let entries: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1301        assert_eq!(entries.len(), 1);
1302        assert_eq!(entries[0].sequence, 0);
1303        assert_eq!(entries[0].location, "loc");
1304        assert_eq!(entries[0].metadata, vec![meta(0, 42, "meta")]);
1305    }
1306
1307    #[test]
1308    fn should_append_to_existing_base_entries() {
1309        let base = Manifest::from_entries(&[entry_seq(0, "base", vec![])]);
1310        let data = base.to_bytes().unwrap();
1311        let mut m = Manifest::from_bytes(data).unwrap();
1312
1313        m.append(&entry("appended", vec![])).unwrap();
1314
1315        assert_eq!(m.entries_count(), 2);
1316        assert_eq!(collect_locations(&m), vec!["base", "appended"]);
1317        let entries: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1318        assert_eq!(entries[0].sequence, 0);
1319        assert_eq!(entries[1].sequence, 1);
1320    }
1321
1322    #[test]
1323    fn should_preserve_append_order() {
1324        let mut m = Manifest::empty();
1325
1326        m.append(&entry("a", vec![])).unwrap();
1327        m.append(&entry("b", vec![])).unwrap();
1328        m.append(&entry("c", vec![])).unwrap();
1329
1330        assert_eq!(collect_locations(&m), vec!["a", "b", "c"]);
1331        let entries: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1332        assert_eq!(entries[0].sequence, 0);
1333        assert_eq!(entries[1].sequence, 1);
1334        assert_eq!(entries[2].sequence, 2);
1335    }
1336
1337    #[test]
1338    fn should_handle_entry_with_empty_location() {
1339        let m = Manifest::from_entries(&[entry_seq(0, "", vec![])]);
1340
1341        let decoded: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1342        assert_eq!(decoded[0].location, "");
1343        assert!(decoded[0].metadata.is_empty());
1344    }
1345
1346    #[test]
1347    fn should_handle_entry_with_large_metadata() {
1348        let big_meta = Bytes::from(vec![0xAB_u8; 1024]);
1349
1350        let m = Manifest::from_entries(&[entry_seq(
1351            0,
1352            "loc",
1353            vec![Metadata {
1354                start_index: 0,
1355                ingestion_time_ms: 1,
1356                payload: big_meta.clone(),
1357            }],
1358        )]);
1359
1360        let decoded: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1361        assert_eq!(decoded[0].metadata.len(), 1);
1362        assert_eq!(decoded[0].metadata[0].payload, big_meta);
1363    }
1364
1365    #[test]
1366    fn should_handle_negative_ingestion_time() {
1367        let m = Manifest::from_entries(&[entry_seq(0, "loc", vec![meta(0, -1000, "")])]);
1368
1369        let decoded: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1370        assert_eq!(decoded[0].metadata[0].ingestion_time_ms, -1000);
1371    }
1372
1373    #[test]
1374    fn should_return_footer_for_empty_manifest() {
1375        let m = Manifest::empty();
1376
1377        let bytes = m.to_bytes().unwrap();
1378
1379        assert_eq!(bytes.len(), FOOTER_SIZE);
1380        assert_eq!(u32::from_le_bytes(bytes[0..4].try_into().unwrap()), 0);
1381        assert_eq!(u64::from_le_bytes(bytes[4..12].try_into().unwrap()), 0);
1382        assert_eq!(u64::from_le_bytes(bytes[12..20].try_into().unwrap()), 0);
1383        assert_eq!(
1384            u16::from_le_bytes(bytes[20..22].try_into().unwrap()),
1385            MANIFEST_VERSION
1386        );
1387    }
1388
1389    #[test]
1390    fn should_merge_base_and_appended() {
1391        let base = Manifest::from_entries(&[entry_seq(0, "base", vec![])]);
1392        let mut m = Manifest::from_bytes(base.to_bytes().unwrap()).unwrap();
1393        m.append(&entry("appended", vec![])).unwrap();
1394
1395        let serialized = m.to_bytes().unwrap();
1396        let reparsed = Manifest::from_bytes(serialized).unwrap();
1397
1398        assert_eq!(reparsed.entries_count(), 2);
1399        assert_eq!(collect_locations(&reparsed), vec!["base", "appended"]);
1400        let entries: Vec<QueueEntry> = reparsed.iter().map(|e| e.unwrap()).collect();
1401        assert_eq!(entries[0].sequence, 0);
1402        assert_eq!(entries[1].sequence, 1);
1403    }
1404
1405    #[test]
1406    fn should_write_correct_footer_count() {
1407        let base = Manifest::from_entries(&[entry_seq(0, "a", vec![]), entry_seq(1, "b", vec![])]);
1408        let mut m = Manifest::from_bytes(base.to_bytes().unwrap()).unwrap();
1409        m.append(&entry("c", vec![])).unwrap();
1410        m.append(&entry("d", vec![])).unwrap();
1411        m.append(&entry("e", vec![])).unwrap();
1412
1413        let bytes = m.to_bytes().unwrap();
1414
1415        let footer_start = bytes.len() - FOOTER_SIZE;
1416        let count = u32::from_le_bytes(bytes[footer_start..footer_start + 4].try_into().unwrap());
1417        let next_seq = u64::from_le_bytes(
1418            bytes[footer_start + 4..footer_start + 12]
1419                .try_into()
1420                .unwrap(),
1421        );
1422        let epoch = u64::from_le_bytes(
1423            bytes[footer_start + 12..footer_start + 20]
1424                .try_into()
1425                .unwrap(),
1426        );
1427        let version = u16::from_le_bytes(bytes[footer_start + 20..].try_into().unwrap());
1428        assert_eq!(count, 5);
1429        assert_eq!(next_seq, 5);
1430        assert_eq!(epoch, 0);
1431        assert_eq!(version, MANIFEST_VERSION);
1432    }
1433
1434    #[test]
1435    fn should_round_trip_from_entries_to_bytes_from_bytes() {
1436        let entries = vec![
1437            entry_seq(0, "a", vec![meta(0, 10, "m1")]),
1438            entry_seq(1, "b", vec![meta(0, 20, "m2")]),
1439        ];
1440        let original = Manifest::from_entries(&entries);
1441
1442        let reparsed = Manifest::from_bytes(original.to_bytes().unwrap()).unwrap();
1443
1444        assert_eq!(reparsed.entries_count(), 2);
1445        let decoded: Vec<QueueEntry> = reparsed.iter().map(|e| e.unwrap()).collect();
1446        assert_eq!(decoded[0].sequence, 0);
1447        assert_eq!(decoded[0].location, "a");
1448        assert_eq!(decoded[0].metadata, vec![meta(0, 10, "m1")]);
1449        assert_eq!(decoded[1].sequence, 1);
1450        assert_eq!(decoded[1].location, "b");
1451        assert_eq!(decoded[1].metadata, vec![meta(0, 20, "m2")]);
1452    }
1453
1454    #[test]
1455    fn should_round_trip_append_serialize_reparse() {
1456        let mut m = Manifest::empty();
1457        m.append(&entry("x", vec![meta(0, 100, "data")])).unwrap();
1458        m.append(&entry("y", vec![meta(0, 200, "more")])).unwrap();
1459
1460        let reparsed = Manifest::from_bytes(m.to_bytes().unwrap()).unwrap();
1461
1462        assert_eq!(reparsed.entries_count(), 2);
1463        assert_eq!(collect_locations(&reparsed), vec!["x", "y"]);
1464    }
1465
1466    #[test]
1467    fn should_chain_serialize_reparse_append() {
1468        let original = Manifest::from_entries(&[entry_seq(0, "a", vec![])]);
1469        let mut m = Manifest::from_bytes(original.to_bytes().unwrap()).unwrap();
1470        m.append(&entry("b", vec![])).unwrap();
1471
1472        let mut m2 = Manifest::from_bytes(m.to_bytes().unwrap()).unwrap();
1473        m2.append(&entry("c", vec![])).unwrap();
1474
1475        let final_m = Manifest::from_bytes(m2.to_bytes().unwrap()).unwrap();
1476
1477        assert_eq!(final_m.entries_count(), 3);
1478        assert_eq!(collect_locations(&final_m), vec!["a", "b", "c"]);
1479        let entries: Vec<QueueEntry> = final_m.iter().map(|e| e.unwrap()).collect();
1480        assert_eq!(entries[2].sequence, 2);
1481    }
1482
1483    #[test]
1484    fn should_dequeue_entries_through_sequence() {
1485        let mut m = Manifest::empty();
1486        for _ in 0..5 {
1487            m.append(&entry("loc", vec![])).unwrap();
1488        }
1489
1490        let removed = m.dequeue(2).unwrap();
1491
1492        assert_eq!(removed.len(), 3);
1493        assert_eq!(removed[0].sequence, 0);
1494        assert_eq!(removed[1].sequence, 1);
1495        assert_eq!(removed[2].sequence, 2);
1496        assert_eq!(m.entries_count(), 2);
1497        let remaining: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1498        assert_eq!(remaining[0].sequence, 3);
1499        assert_eq!(remaining[1].sequence, 4);
1500        assert_eq!(m.next_sequence, 5);
1501    }
1502
1503    #[test]
1504    fn should_dequeue_all_entries() {
1505        let mut m = Manifest::empty();
1506        for _ in 0..3 {
1507            m.append(&entry("loc", vec![])).unwrap();
1508        }
1509
1510        let removed = m.dequeue(2).unwrap();
1511
1512        assert_eq!(removed.len(), 3);
1513        assert!(m.is_empty());
1514        assert_eq!(m.next_sequence, 3);
1515    }
1516
1517    #[test]
1518    fn should_dequeue_nothing_when_sequence_below_first() {
1519        let entries = vec![
1520            entry_seq(5, "a", vec![]),
1521            entry_seq(6, "b", vec![]),
1522            entry_seq(7, "c", vec![]),
1523        ];
1524        let mut m = Manifest::from_entries(&entries);
1525
1526        let removed = m.dequeue(3).unwrap();
1527
1528        assert!(removed.is_empty());
1529        assert_eq!(m.entries_count(), 3);
1530    }
1531
1532    #[test]
1533    fn should_append_after_dequeue() {
1534        let mut m = Manifest::empty();
1535        for _ in 0..3 {
1536            m.append(&entry("loc", vec![])).unwrap();
1537        }
1538
1539        m.dequeue(0).unwrap();
1540
1541        assert_eq!(m.entries_count(), 2);
1542        let remaining: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1543        assert_eq!(remaining[0].sequence, 1);
1544        assert_eq!(remaining[1].sequence, 2);
1545
1546        m.append(&entry("new", vec![])).unwrap();
1547        let all: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1548        assert_eq!(all.len(), 3);
1549        assert_eq!(all[2].sequence, 3);
1550    }
1551
1552    /// Serialize a single entry into raw bytes (without footer).
1553    fn encode_entry_bytes(entry: &QueueEntry) -> Vec<u8> {
1554        let mut buf = BytesMut::new();
1555        Manifest::encode_entry(&mut buf, entry).unwrap();
1556        buf.to_vec()
1557    }
1558
1559    /// Wrap raw entry bytes with a manifest footer (entry_count=1) so that
1560    /// `Manifest::from_bytes` + `.iter()` exercises `decode_entry`.
1561    fn manifest_from_raw_entry(entry_bytes: &[u8]) -> Manifest {
1562        let mut buf = BytesMut::new();
1563        buf.extend_from_slice(entry_bytes);
1564        buf.put_u32_le(1); // entry_count
1565        buf.put_u64_le(1); // next_sequence
1566        buf.put_u64_le(0); // epoch
1567        buf.put_u16_le(MANIFEST_VERSION);
1568        Manifest::from_bytes(buf.freeze()).unwrap()
1569    }
1570
1571    /// Offset of metadata_count inside the encoded entry (for location "a").
1572    /// Layout: entry_len(4) + sequence(8) + location_len(2) + location(1)
1573    const METADATA_COUNT_OFFSET: usize = ENTRY_LEN_SIZE + SEQUENCE_SIZE + LOCATION_LEN_SIZE + 1;
1574
1575    /// Build an entry with no metadata, then corrupt metadata_count to `count`
1576    /// and extend the buffer with `extra_bytes` after metadata_count to simulate
1577    /// partial metadata. Also patches entry_len to match the new total size.
1578    fn corrupt_metadata_entry(count: u32, extra_bytes: &[u8]) -> Vec<u8> {
1579        let e = QueueEntry {
1580            sequence: 1,
1581            location: "a".to_string(),
1582            metadata: vec![],
1583        };
1584        let mut raw = encode_entry_bytes(&e);
1585        // Overwrite metadata_count
1586        raw[METADATA_COUNT_OFFSET..METADATA_COUNT_OFFSET + 4].copy_from_slice(&count.to_le_bytes());
1587        // Append extra bytes (partial metadata fields)
1588        raw.extend_from_slice(extra_bytes);
1589        // Patch entry_len to cover the full buffer after the 4-byte prefix
1590        let new_entry_len = (raw.len() - ENTRY_LEN_SIZE) as u32;
1591        raw[..ENTRY_LEN_SIZE].copy_from_slice(&new_entry_len.to_le_bytes());
1592        raw
1593    }
1594
1595    #[test]
1596    fn should_reject_trailing_bytes_before_footer() {
1597        // Build a valid entry, then add garbage bytes before the footer.
1598        let mut raw = encode_entry_bytes(&entry_seq(0, "loc", vec![]));
1599        raw.extend_from_slice(&[0xFFu8; 5]); // trailing garbage
1600
1601        let manifest = manifest_from_raw_entry(&raw);
1602        let items: Vec<Result<QueueEntry>> = manifest.iter().collect();
1603        assert_eq!(items.len(), 2);
1604        assert!(items[0].is_ok());
1605        let err = items[1].as_ref().unwrap_err();
1606        assert!(
1607            err.to_string().contains("did not consume all bytes"),
1608            "got: {}",
1609            err
1610        );
1611    }
1612
1613    #[test]
1614    fn should_reject_entry_with_entry_len_below_minimum() {
1615        // entry_len too small: less than SEQUENCE_SIZE + LOCATION_LEN_SIZE + METADATA_COUNT_SIZE (14)
1616        let bad_entry_len = (SEQUENCE_SIZE + LOCATION_LEN_SIZE + METADATA_COUNT_SIZE - 1) as u32;
1617        let mut raw = Vec::new();
1618        raw.extend_from_slice(&bad_entry_len.to_le_bytes());
1619        raw.extend_from_slice(&[0u8; 13]); // enough raw bytes to not truncate
1620
1621        let manifest = manifest_from_raw_entry(&raw);
1622        let err = manifest.iter().next().unwrap().unwrap_err();
1623        assert!(err.to_string().contains(
1624            "entry length 13 is less than minimum entry length 14 for the length of the location 0"
1625        ));
1626    }
1627
1628    #[test]
1629    fn should_reject_entry_with_entry_len_below_minimum_for_location() {
1630        let location = "abc";
1631        // entry_len covers fixed fields but not the full location
1632        let bad_entry_len =
1633            (SEQUENCE_SIZE + LOCATION_LEN_SIZE + METADATA_COUNT_SIZE + location.len() - 1) as u32;
1634        let mut raw = Vec::new();
1635        raw.extend_from_slice(&bad_entry_len.to_le_bytes());
1636        raw.extend_from_slice(&0u64.to_le_bytes()); // sequence
1637        raw.extend_from_slice(&(location.len() as u16).to_le_bytes()); // location_len
1638        raw.extend_from_slice(&[0u8; 20]); // padding so entry doesn't extend beyond data
1639
1640        let manifest = manifest_from_raw_entry(&raw);
1641        let err = manifest.iter().next().unwrap().unwrap_err();
1642        assert!(
1643            err.to_string()
1644                .contains("entry length 16 is less than minimum entry length 17"),
1645            "got: {}",
1646            err
1647        );
1648    }
1649
1650    #[test]
1651    fn should_reject_entry_truncated_before_entry_len() {
1652        // Data too short to even read entry_len (need 4 bytes, provide 2)
1653        let manifest = manifest_from_raw_entry(&[0u8; 2]);
1654        let err = manifest.iter().next().unwrap().unwrap_err();
1655        assert!(
1656            matches!(&err, Error::Serialization(msg) if msg.contains("entry length field does not fit")),
1657            "unexpected error: {err}"
1658        );
1659    }
1660
1661    #[test]
1662    fn should_reject_entry_truncated_before_metadata_start_index() {
1663        // metadata_count = 1 but no metadata bytes at all
1664        let manifest = manifest_from_raw_entry(&corrupt_metadata_entry(1, &[]));
1665        let err = manifest.iter().next().unwrap().unwrap_err();
1666        assert!(
1667            matches!(&err, Error::Serialization(msg) if msg.contains("start index field does not fit")),
1668            "unexpected error: {err}"
1669        );
1670    }
1671
1672    #[test]
1673    fn should_reject_entry_truncated_before_metadata_ingestion_time() {
1674        // metadata_count = 1, only start_index present (4 bytes)
1675        let manifest = manifest_from_raw_entry(&corrupt_metadata_entry(1, &0u32.to_le_bytes()));
1676        let err = manifest.iter().next().unwrap().unwrap_err();
1677        assert!(
1678            matches!(&err, Error::Serialization(msg) if msg.contains("ingestion time field does not fit")),
1679            "unexpected error: {err}"
1680        );
1681    }
1682
1683    #[test]
1684    fn should_reject_entry_truncated_before_metadata_length() {
1685        // metadata_count = 1, start_index + ingestion_time present, but no m_len
1686        let mut extra = Vec::new();
1687        extra.extend_from_slice(&0u32.to_le_bytes()); // start_index
1688        extra.extend_from_slice(&0i64.to_le_bytes()); // ingestion_time_ms
1689        let manifest = manifest_from_raw_entry(&corrupt_metadata_entry(1, &extra));
1690        let err = manifest.iter().next().unwrap().unwrap_err();
1691        assert!(
1692            matches!(&err, Error::Serialization(msg) if msg.contains("metadata length field does not fit")),
1693            "unexpected error: {err}"
1694        );
1695    }
1696
1697    #[test]
1698    fn should_reject_entry_truncated_before_metadata_payload() {
1699        // metadata_count = 1, all fixed fields present, m_len says 10 but only 2 bytes follow
1700        let mut extra = Vec::new();
1701        extra.extend_from_slice(&0u32.to_le_bytes()); // start_index
1702        extra.extend_from_slice(&0i64.to_le_bytes()); // ingestion_time_ms
1703        extra.extend_from_slice(&10u32.to_le_bytes()); // m_len = 10
1704        extra.extend_from_slice(&[0xAB, 0xCD]); // only 2 payload bytes
1705        let manifest = manifest_from_raw_entry(&corrupt_metadata_entry(1, &extra));
1706        let err = manifest.iter().next().unwrap().unwrap_err();
1707        assert!(
1708            matches!(&err, Error::Serialization(msg) if msg.contains("metadata has less bytes than set")),
1709            "unexpected error: {err}"
1710        );
1711    }
1712
1713    #[tokio::test]
1714    async fn should_reject_location_exceeding_u16_max() {
1715        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1716        let producer =
1717            QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1718
1719        let long_location = "x".repeat(u16::MAX as usize + 1);
1720        let result = producer.enqueue(long_location, vec![]).await;
1721        assert!(matches!(result, Err(Error::InvalidInput(msg)) if msg.contains("location length")));
1722    }
1723}