Skip to main content

ingest/
queue.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicU64, Ordering};
3
4use bytes::{BufMut, Bytes, BytesMut};
5use slatedb::object_store::path::Path;
6use slatedb::object_store::{
7    Error as ObjectStoreError, ObjectStore, PutMode, PutPayload, UpdateVersion,
8};
9
10use crate::error::{Error, Result};
11
12const MANIFEST_VERSION: u16 = 1;
13const UNINITIALIZED_EPOCH: u64 = u64::MAX;
14const ENTRY_LEN_SIZE: usize = 4;
15const LOCATION_LEN_SIZE: usize = 2;
16const INGESTION_TIME_MS_SIZE: usize = 8;
17const METADATA_LEN_SIZE: usize = 4;
18const START_INDEX_SIZE: usize = 4;
19const METADATA_COUNT_SIZE: usize = 4;
20const ENTRIES_COUNT_SIZE: usize = 4;
21const SEQUENCE_SIZE: usize = 8;
22const EPOCH_SIZE: usize = 8;
23const VERSION_SIZE: usize = 2;
24const FOOTER_SIZE: usize = ENTRIES_COUNT_SIZE + SEQUENCE_SIZE + EPOCH_SIZE + VERSION_SIZE;
25
26/// Per-range metadata attached to a batch by the ingestor.
27#[derive(Debug, Clone, PartialEq)]
28pub struct Metadata {
29    /// Index of the first entry in the batch that this metadata range covers.
30    pub start_index: u32,
31    /// Wall-clock ingestion time in milliseconds since the Unix epoch.
32    pub ingestion_time_ms: i64,
33    /// Opaque metadata payload supplied by the caller of [`Ingestor::ingest`](crate::Ingestor::ingest).
34    pub payload: Bytes,
35}
36
37#[derive(Debug, Clone)]
38pub(crate) struct QueueEntry {
39    pub(crate) sequence: u64,
40    pub(crate) location: String,
41    pub(crate) metadata: Vec<Metadata>,
42}
43
44impl QueueEntry {
45    fn new(location: String, metadata: Vec<Metadata>) -> Result<Self> {
46        if location.len() > u16::MAX as usize {
47            return Err(Error::InvalidInput(format!(
48                "location length {} exceeds u16::MAX",
49                location.len()
50            )));
51        }
52        if metadata.len() > u32::MAX as usize {
53            return Err(Error::InvalidInput(format!(
54                "metadata count {} exceeds u32::MAX",
55                metadata.len()
56            )));
57        }
58        Ok(Self {
59            sequence: 0,
60            location,
61            metadata,
62        })
63    }
64
65    fn clone_with_sequence(&self, sequence: u64) -> Self {
66        Self {
67            sequence,
68            ..self.clone()
69        }
70    }
71}
72
73#[derive(Debug, Clone)]
74pub(crate) struct Manifest {
75    data: Bytes,
76    appended: BytesMut,
77    appended_count: usize,
78    next_sequence: u64,
79    epoch: u64,
80}
81
82impl Manifest {
83    /// Create an empty manifest with a valid footer.
84    fn empty() -> Self {
85        let mut buf = BytesMut::with_capacity(FOOTER_SIZE);
86        buf.put_u32_le(0);
87        buf.put_u64_le(0);
88        buf.put_u64_le(0);
89        buf.put_u16_le(MANIFEST_VERSION);
90        Self {
91            data: buf.freeze(),
92            appended: BytesMut::new(),
93            appended_count: 0,
94            next_sequence: 0,
95            epoch: 0,
96        }
97    }
98
99    /// Wrap raw binary data as a queue manifest, validating the footer.
100    pub(crate) fn from_bytes(data: Bytes) -> Result<Self> {
101        if data.is_empty() {
102            return Err(Error::Serialization(
103                "queue manifest data must not be empty".to_string(),
104            ));
105        }
106        if data.len() < FOOTER_SIZE {
107            return Err(Error::Serialization(
108                "queue manifest too short for footer".to_string(),
109            ));
110        }
111        let version_start = data.len() - VERSION_SIZE;
112        let version = u16::from_le_bytes(data[version_start..].try_into().unwrap());
113        if version != MANIFEST_VERSION {
114            return Err(Error::Serialization(format!(
115                "unsupported queue manifest version: {}",
116                version
117            )));
118        }
119        let epoch_start = data.len() - VERSION_SIZE - EPOCH_SIZE;
120        let epoch = u64::from_le_bytes(
121            data[epoch_start..epoch_start + EPOCH_SIZE]
122                .try_into()
123                .unwrap(),
124        );
125        let next_seq_start = data.len() - VERSION_SIZE - EPOCH_SIZE - SEQUENCE_SIZE;
126        let next_sequence = u64::from_le_bytes(
127            data[next_seq_start..next_seq_start + SEQUENCE_SIZE]
128                .try_into()
129                .unwrap(),
130        );
131        Ok(Self {
132            data,
133            appended: BytesMut::new(),
134            appended_count: 0,
135            next_sequence,
136            epoch,
137        })
138    }
139
140    /// Build a manifest from a slice of entries.
141    #[cfg(test)]
142    fn from_entries(entries: &[QueueEntry]) -> Self {
143        let next_sequence = entries.iter().map(|e| e.sequence + 1).max().unwrap_or(0);
144        let mut buf = BytesMut::new();
145        for entry in entries {
146            Self::encode_entry(&mut buf, entry).unwrap();
147        }
148        buf.put_u32_le(entries.len() as u32);
149        buf.put_u64_le(next_sequence);
150        buf.put_u64_le(0);
151        buf.put_u16_le(MANIFEST_VERSION);
152        Self {
153            data: buf.freeze(),
154            appended: BytesMut::new(),
155            appended_count: 0,
156            next_sequence,
157            epoch: 0,
158        }
159    }
160
161    /// Number of entries (read from the footer, O(1)).
162    fn entries_count(&self) -> usize {
163        let base = self.existing_entries_count();
164        base + self.appended_count
165    }
166
167    /// Whether the manifest contains no entries.
168    #[cfg(test)]
169    fn is_empty(&self) -> bool {
170        self.entries_count() == 0
171    }
172
173    /// Return a borrowing iterator that lazily deserializes entries.
174    pub(crate) fn iter(&self) -> ManifestIter<'_> {
175        let base_count = self.existing_entries_count();
176        let entries_end = if self.data.is_empty() {
177            0
178        } else {
179            self.data.len() - FOOTER_SIZE
180        };
181        ManifestIter {
182            data: &self.data,
183            offset: 0,
184            remaining: base_count,
185            entries_end,
186            appended: &self.appended,
187            appended_offset: 0,
188            appended_remaining: self.appended_count,
189        }
190    }
191
192    fn existing_entries_count(&self) -> usize {
193        if self.data.is_empty() {
194            0
195        } else {
196            let footer_start = self.data.len() - FOOTER_SIZE;
197            u32::from_le_bytes(
198                self.data[footer_start..footer_start + ENTRIES_COUNT_SIZE]
199                    .try_into()
200                    .unwrap(),
201            ) as usize
202        }
203    }
204
205    /// Append a single entry without copying existing data.
206    /// The entry is encoded and stored internally; bytes are merged in `to_bytes()`.
207    /// The entry's sequence number is overwritten with the manifest's next sequence.
208    fn append(&mut self, entry: &QueueEntry) -> Result<()> {
209        let sequenced = entry.clone_with_sequence(self.next_sequence);
210        Self::encode_entry(&mut self.appended, &sequenced)?;
211        self.next_sequence += 1;
212        self.appended_count += 1;
213        Ok(())
214    }
215
216    /// Remove all entries with sequence <= `through_sequence`, returning them.
217    ///
218    /// Optimized to avoid deserializing/re-serializing remaining entries: only the
219    /// removed entries are fully decoded, while remaining entries are byte-copied.
220    fn dequeue(&mut self, through_sequence: u64) -> Result<Vec<QueueEntry>> {
221        let next_seq = self.next_sequence;
222        let epoch = self.epoch;
223
224        let base_count = self.existing_entries_count();
225        let entries_end = if self.data.is_empty() {
226            0
227        } else {
228            self.data.len() - FOOTER_SIZE
229        };
230
231        let (mut removed, remaining_base_start, remaining_base_count) =
232            split_entries(&self.data, base_count, entries_end, through_sequence)?;
233
234        let appended_end = self.appended.len();
235        let (appended_removed, remaining_appended_start, remaining_appended_count) = split_entries(
236            &self.appended,
237            self.appended_count,
238            appended_end,
239            through_sequence,
240        )?;
241        removed.extend(appended_removed);
242
243        let remaining_base_bytes = &self.data[remaining_base_start..entries_end];
244        let remaining_appended_bytes = &self.appended[remaining_appended_start..appended_end];
245        let total_remaining = remaining_base_count + remaining_appended_count;
246
247        let mut buf = BytesMut::with_capacity(
248            remaining_base_bytes.len() + remaining_appended_bytes.len() + FOOTER_SIZE,
249        );
250        buf.extend_from_slice(remaining_base_bytes);
251        buf.extend_from_slice(remaining_appended_bytes);
252        buf.put_u32_le(total_remaining);
253        buf.put_u64_le(next_seq);
254        buf.put_u64_le(epoch);
255        buf.put_u16_le(MANIFEST_VERSION);
256
257        self.data = buf.freeze();
258        self.appended = BytesMut::new();
259        self.appended_count = 0;
260        self.next_sequence = next_seq;
261        self.epoch = epoch;
262
263        Ok(removed)
264    }
265
266    /// Set the epoch and patch the data bytes in place.
267    fn set_epoch(&mut self, epoch: u64) {
268        self.epoch = epoch;
269        let mut buf = BytesMut::from(self.data.as_ref());
270        let epoch_start = buf.len() - VERSION_SIZE - EPOCH_SIZE;
271        buf[epoch_start..epoch_start + EPOCH_SIZE].copy_from_slice(&epoch.to_le_bytes());
272        self.data = buf.freeze();
273    }
274
275    /// Serialize the manifest to bytes for writing to object storage.
276    /// When an entry was appended, this merges existing data with the appended
277    /// entry and writes a new footer.
278    fn to_bytes(&self) -> Result<Bytes> {
279        if self.appended.is_empty() {
280            return Ok(self.data.clone());
281        }
282        let (prefix, base_count) = if self.data.is_empty() {
283            (&[] as &[u8], 0u32)
284        } else {
285            let footer_start = self.data.len() - FOOTER_SIZE;
286            let count = u32::from_le_bytes(
287                self.data[footer_start..footer_start + ENTRIES_COUNT_SIZE]
288                    .try_into()
289                    .unwrap(),
290            );
291            (&self.data[..footer_start], count)
292        };
293        let total_count: u32 = base_count
294            .checked_add(self.appended_count as u32)
295            .ok_or_else(|| {
296                Error::Serialization(format!(
297                    "total entry count consisting of {} existing entries + {} appended entries exceeds u32::MAX",
298                    base_count, self.appended_count
299                ))
300            })?;
301        let mut buf = BytesMut::with_capacity(prefix.len() + self.appended.len() + FOOTER_SIZE);
302        buf.extend_from_slice(prefix);
303        buf.extend_from_slice(&self.appended);
304        buf.put_u32_le(total_count);
305        buf.put_u64_le(self.next_sequence);
306        buf.put_u64_le(self.epoch);
307        buf.put_u16_le(MANIFEST_VERSION);
308        Ok(buf.freeze())
309    }
310
311    fn encode_entry(buf: &mut BytesMut, entry: &QueueEntry) -> Result<()> {
312        debug_assert!(entry.location.len() <= u16::MAX as usize);
313        let metadata_size: usize = METADATA_COUNT_SIZE
314            + entry
315                .metadata
316                .iter()
317                .map(|m| {
318                    START_INDEX_SIZE + INGESTION_TIME_MS_SIZE + METADATA_LEN_SIZE + m.payload.len()
319                })
320                .sum::<usize>();
321        let entry_body_len =
322            SEQUENCE_SIZE + LOCATION_LEN_SIZE + entry.location.len() + metadata_size;
323        debug_assert!(entry_body_len <= u32::MAX as usize);
324        buf.put_u32_le(entry_body_len as u32);
325        buf.put_u64_le(entry.sequence);
326        buf.put_u16_le(entry.location.len() as u16);
327        buf.extend_from_slice(entry.location.as_bytes());
328        debug_assert!(entry.metadata.len() <= u32::MAX as usize);
329        buf.put_u32_le(entry.metadata.len() as u32);
330        for m in &entry.metadata {
331            if m.payload.len() > u32::MAX as usize {
332                return Err(Error::InvalidInput(format!(
333                    "metadata payload size {} exceeds u32::MAX",
334                    m.payload.len()
335                )));
336            }
337            buf.put_u32_le(m.start_index);
338            buf.put_i64_le(m.ingestion_time_ms);
339            buf.put_u32_le(m.payload.len() as u32);
340            buf.extend_from_slice(&m.payload);
341        }
342        Ok(())
343    }
344}
345
346/// Walk entries in `data[0..end]`, splitting at `through_sequence`.
347/// Entries with sequence <= through_sequence are fully decoded and returned.
348/// Returns (removed_entries, remaining_start_offset, remaining_count).
349fn split_entries(
350    data: &[u8],
351    count: usize,
352    end: usize,
353    through_sequence: u64,
354) -> Result<(Vec<QueueEntry>, usize, u32)> {
355    let mut removed = Vec::new();
356    let mut offset = 0usize;
357
358    for i in 0..count {
359        let entry_start = offset;
360        let entry = decode_entry(data, &mut offset, end)?;
361
362        if entry.sequence <= through_sequence {
363            removed.push(entry);
364        } else {
365            return Ok((removed, entry_start, (count - i) as u32));
366        }
367    }
368
369    Ok((removed, end, 0))
370}
371
372/// Decode a single entry from binary data at the given offset.
373fn decode_entry(data: &[u8], offset: &mut usize, end: usize) -> Result<QueueEntry> {
374    if *offset + ENTRY_LEN_SIZE > end {
375        return Err(Error::Serialization(
376            "queue entry corrupt: size of entry length field does not fit in entry".to_string(),
377        ));
378    }
379
380    let entry_len =
381        u32::from_le_bytes(data[*offset..*offset + ENTRY_LEN_SIZE].try_into().unwrap()) as usize;
382    *offset += ENTRY_LEN_SIZE;
383
384    if *offset + entry_len > end {
385        return Err(Error::Serialization(
386            "queue entry corrupt: entry has less bytes than set in the entry length".to_string(),
387        ));
388    }
389
390    let entry_end = *offset + entry_len;
391
392    let sequence = u64::from_le_bytes(data[*offset..*offset + SEQUENCE_SIZE].try_into().unwrap());
393    *offset += SEQUENCE_SIZE;
394
395    let location_len = u16::from_le_bytes(
396        data[*offset..*offset + LOCATION_LEN_SIZE]
397            .try_into()
398            .unwrap(),
399    ) as usize;
400    *offset += LOCATION_LEN_SIZE;
401
402    let min_entry_len = SEQUENCE_SIZE + LOCATION_LEN_SIZE + location_len + METADATA_COUNT_SIZE;
403    if entry_len < min_entry_len {
404        return Err(Error::Serialization(format!(
405            "queue entry corrupt: entry length {} is less than minimum entry length {} for the length of the location {}",
406            entry_len, min_entry_len, location_len
407        )));
408    }
409
410    let location = String::from_utf8(data[*offset..*offset + location_len].to_vec())
411        .map_err(|e| Error::Serialization(e.to_string()))?;
412    *offset += location_len;
413
414    let metadata_count = u32::from_le_bytes(
415        data[*offset..*offset + METADATA_COUNT_SIZE]
416            .try_into()
417            .unwrap(),
418    ) as usize;
419    *offset += METADATA_COUNT_SIZE;
420
421    let mut metadata = Vec::with_capacity(metadata_count);
422    for _ in 0..metadata_count {
423        if *offset + START_INDEX_SIZE > end {
424            return Err(Error::Serialization(
425                "queue entry corrupt: size of start index field does not fit in entry".to_string(),
426            ));
427        }
428        let start_index = u32::from_le_bytes(
429            data[*offset..*offset + START_INDEX_SIZE]
430                .try_into()
431                .unwrap(),
432        );
433        *offset += START_INDEX_SIZE;
434
435        if *offset + INGESTION_TIME_MS_SIZE > end {
436            return Err(Error::Serialization(
437                "queue entry corrupt: size of ingestion time field does not fit in entry"
438                    .to_string(),
439            ));
440        }
441        let ingestion_time_ms = i64::from_le_bytes(
442            data[*offset..*offset + INGESTION_TIME_MS_SIZE]
443                .try_into()
444                .unwrap(),
445        );
446        *offset += INGESTION_TIME_MS_SIZE;
447
448        if *offset + METADATA_LEN_SIZE > end {
449            return Err(Error::Serialization(
450                "queue entry corrupt: size of metadata length field does not fit in entry"
451                    .to_string(),
452            ));
453        }
454        let m_len = u32::from_le_bytes(
455            data[*offset..*offset + METADATA_LEN_SIZE]
456                .try_into()
457                .unwrap(),
458        ) as usize;
459        *offset += METADATA_LEN_SIZE;
460
461        if *offset + m_len > end {
462            return Err(Error::Serialization(
463                "queue entry corrupt: metadata has less bytes than set in the metadata length"
464                    .to_string(),
465            ));
466        }
467        metadata.push(Metadata {
468            start_index,
469            ingestion_time_ms,
470            payload: Bytes::copy_from_slice(&data[*offset..*offset + m_len]),
471        });
472        *offset += m_len;
473    }
474
475    *offset = entry_end;
476
477    Ok(QueueEntry {
478        sequence,
479        location,
480        metadata,
481    })
482}
483
484/// Borrowing iterator over manifest entries. Lazily deserializes each entry.
485pub(crate) struct ManifestIter<'a> {
486    data: &'a [u8],
487    offset: usize,
488    remaining: usize,
489    entries_end: usize,
490    appended: &'a [u8],
491    appended_offset: usize,
492    appended_remaining: usize,
493}
494
495impl Iterator for ManifestIter<'_> {
496    type Item = Result<QueueEntry>;
497
498    fn next(&mut self) -> Option<Self::Item> {
499        if self.remaining > 0 {
500            self.remaining -= 1;
501            Some(decode_entry(self.data, &mut self.offset, self.entries_end))
502        } else if self.appended_remaining > 0 {
503            self.appended_remaining -= 1;
504            Some(decode_entry(
505                self.appended,
506                &mut self.appended_offset,
507                self.appended.len(),
508            ))
509        } else if self.offset != self.entries_end {
510            let err = Some(Err(Error::Serialization(format!(
511                "base entries did not consume all bytes: offset {} != entries_end {}",
512                self.offset, self.entries_end
513            ))));
514            self.offset = self.entries_end;
515            err
516        } else {
517            None
518        }
519    }
520}
521
522enum ManifestWriteError {
523    Conflict,
524    Fatal(Error),
525}
526
527#[derive(Clone)]
528struct ManifestStore {
529    object_store: Arc<dyn ObjectStore>,
530    manifest_path: String,
531}
532
533impl ManifestStore {
534    async fn read(&self) -> Result<(Manifest, Option<UpdateVersion>)> {
535        let path = Path::from(self.manifest_path.as_str());
536        match self.object_store.get(&path).await {
537            Ok(result) => {
538                let version = UpdateVersion {
539                    e_tag: result.meta.e_tag.clone(),
540                    version: result.meta.version.clone(),
541                };
542                let bytes = result
543                    .bytes()
544                    .await
545                    .map_err(|e| Error::Storage(e.to_string()))?;
546                let manifest = Manifest::from_bytes(bytes)?;
547                Ok((manifest, Some(version)))
548            }
549            Err(ObjectStoreError::NotFound { .. }) => Ok((Manifest::empty(), None)),
550            Err(e) => Err(Error::Storage(e.to_string())),
551        }
552    }
553
554    async fn write(
555        &self,
556        manifest: &Manifest,
557        version: Option<UpdateVersion>,
558    ) -> std::result::Result<(), ManifestWriteError> {
559        let path = Path::from(self.manifest_path.as_str());
560        let put_mode = match version {
561            Some(v) => PutMode::Update(v),
562            None => PutMode::Create,
563        };
564        let data = manifest.to_bytes().map_err(ManifestWriteError::Fatal)?;
565
566        match self
567            .object_store
568            .put_opts(&path, PutPayload::from(data.to_vec()), put_mode.into())
569            .await
570        {
571            Ok(_) => Ok(()),
572            Err(ObjectStoreError::Precondition { .. })
573            | Err(ObjectStoreError::AlreadyExists { .. }) => Err(ManifestWriteError::Conflict),
574            Err(e) => Err(ManifestWriteError::Fatal(Error::Storage(e.to_string()))),
575        }
576    }
577}
578
579struct ConflictCounter {
580    write_count: AtomicU64,
581    conflict_count: AtomicU64,
582}
583
584impl ConflictCounter {
585    fn new() -> Self {
586        Self {
587            write_count: AtomicU64::new(0),
588            conflict_count: AtomicU64::new(0),
589        }
590    }
591
592    fn record_write(&self) {
593        self.write_count.fetch_add(1, Ordering::Relaxed);
594    }
595
596    fn record_conflict(&self) {
597        self.conflict_count.fetch_add(1, Ordering::Relaxed);
598    }
599
600    fn conflict_rate(&self) -> f64 {
601        let writes = self.write_count.load(Ordering::Relaxed);
602        if writes == 0 {
603            return 0.0;
604        }
605        let conflicts = self.conflict_count.load(Ordering::Relaxed);
606        let rate = (conflicts as f64 / writes as f64) * 100.0;
607        rate.min(100.0)
608    }
609}
610
611/// A producer that appends entries to a shared manifest in object storage.
612///
613/// Writes use optimistic concurrency: the manifest is read, modified locally,
614/// and written back with a conditional put. On conflict the operation is
615/// retried automatically until it succeeds.
616pub struct QueueProducer {
617    manifest_store: ManifestStore,
618    counter: ConflictCounter,
619}
620
621impl QueueProducer {
622    /// Create a new producer backed by the given [`ObjectStore`].
623    pub fn with_object_store(manifest_path: String, object_store: Arc<dyn ObjectStore>) -> Self {
624        Self {
625            manifest_store: ManifestStore {
626                object_store,
627                manifest_path,
628            },
629            counter: ConflictCounter::new(),
630        }
631    }
632
633    /// Append an entry to the queue with the given `location` and `metadata`.
634    ///
635    /// Returns [`Error::InvalidInput`] if `location` exceeds 65 535 bytes or
636    /// `metadata` exceeds 2³²−1 items. The write is retried automatically on
637    /// optimistic-concurrency conflicts.
638    pub async fn enqueue(&self, location: String, metadata: Vec<Metadata>) -> Result<()> {
639        let entry = QueueEntry::new(location, metadata)?;
640        loop {
641            let (mut manifest, version) = self.manifest_store.read().await?;
642            manifest.append(&entry)?;
643            self.counter.record_write();
644            match self.manifest_store.write(&manifest, version).await {
645                Ok(()) => return Ok(()),
646                Err(ManifestWriteError::Conflict) => {
647                    self.counter.record_conflict();
648                    continue;
649                }
650                Err(ManifestWriteError::Fatal(e)) => return Err(e),
651            }
652        }
653    }
654
655    /// Return the percentage of manifest writes that encountered a conflict.
656    pub fn conflict_rate(&self) -> f64 {
657        self.counter.conflict_rate()
658    }
659}
660
661/// A consumer that reads and dequeues entries from a shared manifest in
662/// object storage.
663///
664/// Single-consumer semantics are enforced through an epoch stored in the
665/// manifest. Calling [`QueueConsumer::initialize`] increments the epoch,
666/// fencing any previous consumer instance. Every subsequent read or dequeue
667/// checks that the local epoch still matches the manifest, returning
668/// [`Error::Fenced`] if another consumer has taken over.
669pub struct QueueConsumer {
670    manifest_store: ManifestStore,
671    epoch: AtomicU64,
672    counter: ConflictCounter,
673    queue_len: AtomicU64,
674}
675
676impl QueueConsumer {
677    /// Create a new consumer backed by the given [`ObjectStore`].
678    ///
679    /// The consumer is not active until [`QueueConsumer::initialize`] is called.
680    pub fn with_object_store(manifest_path: String, object_store: Arc<dyn ObjectStore>) -> Self {
681        Self {
682            manifest_store: ManifestStore {
683                object_store,
684                manifest_path,
685            },
686            epoch: AtomicU64::new(UNINITIALIZED_EPOCH),
687            counter: ConflictCounter::new(),
688            queue_len: AtomicU64::new(0),
689        }
690    }
691
692    /// Initialize the consumer by incrementing the epoch in the queue manifest.
693    /// This fences any previous consumer that was using the old epoch.
694    pub async fn initialize(&self) -> Result<()> {
695        loop {
696            let (mut manifest, version) = self.read_manifest().await?;
697            let mut new_epoch = manifest.epoch.wrapping_add(1);
698            if new_epoch == UNINITIALIZED_EPOCH {
699                new_epoch = new_epoch.wrapping_add(1);
700            }
701            manifest.set_epoch(new_epoch);
702            match self.write_manifest(&manifest, version).await {
703                Ok(()) => {
704                    self.epoch.store(new_epoch, Ordering::Relaxed);
705                    return Ok(());
706                }
707                Err(ManifestWriteError::Conflict) => {
708                    self.counter.record_conflict();
709                    continue;
710                }
711                Err(ManifestWriteError::Fatal(e)) => return Err(e),
712            }
713        }
714    }
715
716    /// Return the first entry in the queue without dequeueing it.
717    /// Returns `Fenced` if the consumer's epoch does not match the manifest's epoch.
718    pub(crate) async fn peek(&self) -> Result<Option<QueueEntry>> {
719        let (manifest, _) = self.read_manifest().await?;
720        if manifest.epoch != self.epoch.load(Ordering::Relaxed) {
721            return Err(Error::Fenced);
722        }
723        manifest.iter().next().transpose()
724    }
725
726    /// Return the entry with the given sequence number, or None if not found.
727    /// Returns `Fenced` if the consumer's epoch does not match the manifest's epoch.
728    pub(crate) async fn read(&self, sequence: u64) -> Result<Option<QueueEntry>> {
729        let (manifest, _) = self.read_manifest().await?;
730        if manifest.epoch != self.epoch.load(Ordering::Relaxed) {
731            return Err(Error::Fenced);
732        }
733        manifest
734            .iter()
735            .find(|e| matches!(e, Ok(e) if e.sequence == sequence))
736            .transpose()
737    }
738
739    /// Remove all entries with sequence <= `through_sequence`, returning the removed entries.
740    /// Returns `Fenced` if the consumer's epoch does not match the manifest's epoch.
741    pub(crate) async fn dequeue(&self, through_sequence: u64) -> Result<Vec<QueueEntry>> {
742        loop {
743            let (mut manifest, version) = self.read_manifest().await?;
744            if manifest.epoch != self.epoch.load(Ordering::Relaxed) {
745                return Err(Error::Fenced);
746            }
747            let removed = manifest.dequeue(through_sequence)?;
748            match self.write_manifest(&manifest, version).await {
749                Ok(()) => return Ok(removed),
750                Err(ManifestWriteError::Conflict) => {
751                    self.counter.record_conflict();
752                    continue;
753                }
754                Err(ManifestWriteError::Fatal(e)) => return Err(e),
755            }
756        }
757    }
758
759    /// Return the number of entries in the queue as of the last manifest read or write.
760    pub fn len(&self) -> usize {
761        self.queue_len.load(Ordering::Relaxed) as usize
762    }
763
764    async fn read_manifest(&self) -> Result<(Manifest, Option<UpdateVersion>)> {
765        let result = self.manifest_store.read().await?;
766        self.queue_len
767            .store(result.0.entries_count() as u64, Ordering::Relaxed);
768        Ok(result)
769    }
770
771    async fn write_manifest(
772        &self,
773        manifest: &Manifest,
774        version: Option<UpdateVersion>,
775    ) -> std::result::Result<(), ManifestWriteError> {
776        self.counter.record_write();
777        let result = self.manifest_store.write(manifest, version).await;
778        if result.is_ok() {
779            self.queue_len
780                .store(manifest.entries_count() as u64, Ordering::Relaxed);
781        }
782        result
783    }
784
785    /// Return the percentage of manifest writes that encountered a conflict.
786    pub fn conflict_rate(&self) -> f64 {
787        self.counter.conflict_rate()
788    }
789}
790
791#[cfg(test)]
792mod tests {
793    use super::*;
794    use slatedb::object_store::memory::InMemory;
795
796    const TEST_MANIFEST_PATH: &str = "test/manifest";
797
798    async fn read_producer_manifest(store: &Arc<dyn ObjectStore>, path: &str) -> Manifest {
799        let path = Path::from(path);
800        let result = store.get(&path).await.unwrap();
801        let bytes = result.bytes().await.unwrap();
802        Manifest::from_bytes(bytes).unwrap()
803    }
804
805    #[tokio::test]
806    async fn should_initialize_consumer_and_increment_epoch() {
807        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
808        let consumer =
809            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
810
811        consumer.initialize().await.unwrap();
812
813        let manifest = read_producer_manifest(&store, TEST_MANIFEST_PATH).await;
814        assert_eq!(manifest.epoch, 1);
815    }
816
817    #[tokio::test]
818    async fn should_peek_none_when_queue_is_empty() {
819        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
820        let consumer =
821            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
822        consumer.initialize().await.unwrap();
823
824        let result = consumer.peek().await.unwrap();
825        assert!(result.is_none());
826    }
827
828    #[tokio::test]
829    async fn should_read_entry_by_sequence() {
830        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
831        let producer =
832            QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
833
834        producer
835            .enqueue("a.batch".to_string(), vec![])
836            .await
837            .unwrap();
838        producer
839            .enqueue("b.batch".to_string(), vec![])
840            .await
841            .unwrap();
842        producer
843            .enqueue("c.batch".to_string(), vec![])
844            .await
845            .unwrap();
846
847        let consumer =
848            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
849        consumer.initialize().await.unwrap();
850
851        let entry = consumer.read(1).await.unwrap().unwrap();
852        assert_eq!(entry.location, "b.batch");
853        assert_eq!(entry.sequence, 1);
854    }
855
856    #[tokio::test]
857    async fn should_read_none_for_missing_sequence() {
858        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
859        let producer =
860            QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
861
862        producer
863            .enqueue("a.batch".to_string(), vec![])
864            .await
865            .unwrap();
866
867        let consumer =
868            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
869        consumer.initialize().await.unwrap();
870
871        let result = consumer.read(99).await.unwrap();
872        assert!(result.is_none());
873    }
874
875    #[tokio::test]
876    async fn should_fence_old_consumer_on_peek() {
877        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
878        let consumer_a =
879            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
880        consumer_a.initialize().await.unwrap();
881
882        let consumer_b =
883            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
884        consumer_b.initialize().await.unwrap();
885
886        let result = consumer_a.peek().await;
887        assert!(matches!(result, Err(Error::Fenced)));
888    }
889
890    #[tokio::test]
891    async fn should_fence_old_consumer_on_dequeue() {
892        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
893        let consumer_a =
894            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
895        consumer_a.initialize().await.unwrap();
896
897        let consumer_b =
898            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
899        consumer_b.initialize().await.unwrap();
900
901        let result = consumer_a.dequeue(0).await;
902        assert!(matches!(result, Err(Error::Fenced)));
903    }
904
905    #[tokio::test]
906    async fn should_fence_uninitialized_consumer() {
907        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
908        let producer =
909            QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
910
911        producer
912            .enqueue("a.batch".to_string(), vec![])
913            .await
914            .unwrap();
915
916        let consumer =
917            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
918
919        let result = consumer.peek().await;
920        assert!(matches!(result, Err(Error::Fenced)));
921    }
922
923    #[tokio::test]
924    async fn should_wrap_epoch_to_zero_at_max() {
925        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
926
927        let mut manifest = Manifest::empty();
928        manifest.set_epoch(u64::MAX - 1);
929        let path = Path::from(TEST_MANIFEST_PATH);
930        store
931            .put(
932                &path,
933                PutPayload::from(manifest.to_bytes().unwrap().to_vec()),
934            )
935            .await
936            .unwrap();
937
938        let consumer =
939            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
940        consumer.initialize().await.unwrap();
941
942        let manifest = read_producer_manifest(&store, TEST_MANIFEST_PATH).await;
943        assert_eq!(manifest.epoch, 0);
944    }
945
946    #[tokio::test]
947    async fn should_peek_first_entry_with_valid_epoch() {
948        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
949        let producer =
950            QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
951
952        producer
953            .enqueue("a.batch".to_string(), vec![])
954            .await
955            .unwrap();
956        producer
957            .enqueue("b.batch".to_string(), vec![])
958            .await
959            .unwrap();
960
961        let consumer =
962            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
963        consumer.initialize().await.unwrap();
964
965        let entry = consumer.peek().await.unwrap().unwrap();
966        assert_eq!(entry.location, "a.batch");
967    }
968
969    #[tokio::test]
970    async fn should_dequeue_entries_with_valid_epoch() {
971        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
972        let producer =
973            QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
974
975        producer
976            .enqueue("a.batch".to_string(), vec![])
977            .await
978            .unwrap();
979        producer
980            .enqueue("b.batch".to_string(), vec![])
981            .await
982            .unwrap();
983        producer
984            .enqueue("c.batch".to_string(), vec![])
985            .await
986            .unwrap();
987
988        let consumer =
989            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
990        consumer.initialize().await.unwrap();
991
992        let removed = consumer.dequeue(1).await.unwrap();
993        assert_eq!(removed.len(), 2);
994        assert_eq!(removed[0].location, "a.batch");
995        assert_eq!(removed[1].location, "b.batch");
996
997        let next = consumer.peek().await.unwrap().unwrap();
998        assert_eq!(next.location, "c.batch");
999    }
1000
1001    #[tokio::test]
1002    async fn should_enqueue_after_consumer_dequeue() {
1003        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1004        let producer =
1005            QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1006
1007        producer
1008            .enqueue("a.batch".to_string(), vec![])
1009            .await
1010            .unwrap();
1011        producer
1012            .enqueue("b.batch".to_string(), vec![])
1013            .await
1014            .unwrap();
1015
1016        let consumer =
1017            QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1018        consumer.initialize().await.unwrap();
1019
1020        consumer.dequeue(1).await.unwrap();
1021
1022        producer
1023            .enqueue("c.batch".to_string(), vec![])
1024            .await
1025            .unwrap();
1026
1027        let next = consumer.peek().await.unwrap().unwrap();
1028        assert_eq!(next.location, "c.batch");
1029        assert_eq!(next.sequence, 2);
1030    }
1031
1032    #[tokio::test]
1033    async fn should_enqueue_locations_to_manifest() {
1034        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1035        let producer =
1036            QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1037
1038        producer
1039            .enqueue("path/to/file1.batch".to_string(), vec![])
1040            .await
1041            .unwrap();
1042        producer
1043            .enqueue("path/to/file2.batch".to_string(), vec![])
1044            .await
1045            .unwrap();
1046
1047        let manifest = read_producer_manifest(&store, TEST_MANIFEST_PATH).await;
1048        let locations: Vec<String> = manifest.iter().map(|e| e.unwrap().location).collect();
1049        assert_eq!(
1050            locations,
1051            vec!["path/to/file1.batch", "path/to/file2.batch"]
1052        );
1053    }
1054
1055    #[tokio::test]
1056    async fn should_merge_with_existing_manifest() {
1057        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1058
1059        let existing = Manifest::from_entries(&[QueueEntry {
1060            sequence: 0,
1061            location: "existing/file.batch".to_string(),
1062            metadata: vec![],
1063        }]);
1064        let path = Path::from(TEST_MANIFEST_PATH);
1065        store
1066            .put(
1067                &path,
1068                PutPayload::from(existing.to_bytes().unwrap().to_vec()),
1069            )
1070            .await
1071            .unwrap();
1072
1073        let producer =
1074            QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1075        producer
1076            .enqueue("new/file.batch".to_string(), vec![])
1077            .await
1078            .unwrap();
1079
1080        let manifest = read_producer_manifest(&store, "test/manifest").await;
1081        let locations: Vec<String> = manifest.iter().map(|e| e.unwrap().location).collect();
1082        assert_eq!(locations, vec!["existing/file.batch", "new/file.batch"]);
1083    }
1084
1085    fn entry(location: &str, metadata: Vec<Metadata>) -> QueueEntry {
1086        QueueEntry::new(location.to_string(), metadata).unwrap()
1087    }
1088
1089    fn entry_seq(seq: u64, location: &str, metadata: Vec<Metadata>) -> QueueEntry {
1090        QueueEntry {
1091            sequence: seq,
1092            location: location.to_string(),
1093            metadata,
1094        }
1095    }
1096
1097    fn meta(start_index: u32, time_ms: i64, data: &str) -> Metadata {
1098        Metadata {
1099            start_index,
1100            ingestion_time_ms: time_ms,
1101            payload: Bytes::from(data.to_string()),
1102        }
1103    }
1104
1105    fn collect_locations(manifest: &Manifest) -> Vec<String> {
1106        manifest.iter().map(|e| e.unwrap().location).collect()
1107    }
1108
1109    #[test]
1110    fn should_create_empty_manifest() {
1111        let m = Manifest::empty();
1112
1113        assert_eq!(m.entries_count(), 0);
1114        assert!(m.is_empty());
1115        assert_eq!(m.epoch, 0);
1116
1117        let bytes = m.to_bytes().unwrap();
1118        assert_eq!(bytes.len(), FOOTER_SIZE);
1119        assert_eq!(u32::from_le_bytes(bytes[0..4].try_into().unwrap()), 0);
1120        assert_eq!(u64::from_le_bytes(bytes[4..12].try_into().unwrap()), 0);
1121        assert_eq!(u64::from_le_bytes(bytes[12..20].try_into().unwrap()), 0);
1122        assert_eq!(
1123            u16::from_le_bytes(bytes[20..22].try_into().unwrap()),
1124            MANIFEST_VERSION
1125        );
1126    }
1127
1128    #[test]
1129    fn should_parse_valid_manifest_bytes() {
1130        let entries = vec![
1131            entry_seq(0, "a", vec![meta(0, 1, "x")]),
1132            entry_seq(1, "b", vec![meta(0, 2, "y")]),
1133        ];
1134        let data = Manifest::from_entries(&entries).to_bytes().unwrap();
1135
1136        let m = Manifest::from_bytes(data).unwrap();
1137
1138        assert_eq!(m.entries_count(), 2);
1139    }
1140
1141    #[test]
1142    fn should_parse_footer_only_bytes() {
1143        let mut buf = BytesMut::with_capacity(FOOTER_SIZE);
1144        buf.put_u32_le(0);
1145        buf.put_u64_le(42);
1146        buf.put_u64_le(0);
1147        buf.put_u16_le(MANIFEST_VERSION);
1148
1149        let m = Manifest::from_bytes(buf.freeze()).unwrap();
1150
1151        assert_eq!(m.entries_count(), 0);
1152        assert_eq!(m.epoch, 0);
1153
1154        let mut m = m;
1155        m.append(&entry("loc", vec![])).unwrap();
1156        let entries: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1157        assert_eq!(entries[0].sequence, 42);
1158    }
1159
1160    #[test]
1161    fn should_reject_empty_bytes() {
1162        let err = Manifest::from_bytes(Bytes::new()).unwrap_err();
1163
1164        assert!(err.to_string().contains("must not be empty"));
1165    }
1166
1167    #[test]
1168    fn should_reject_bytes_too_short_for_footer() {
1169        let err = Manifest::from_bytes(Bytes::from_static(&[0; 21])).unwrap_err();
1170
1171        assert!(err.to_string().contains("too short for footer"));
1172    }
1173
1174    #[test]
1175    fn should_reject_wrong_version() {
1176        let mut buf = BytesMut::with_capacity(FOOTER_SIZE);
1177        buf.put_u32_le(0);
1178        buf.put_u64_le(0);
1179        buf.put_u64_le(0);
1180        buf.put_u16_le(99);
1181
1182        let err = Manifest::from_bytes(buf.freeze()).unwrap_err();
1183
1184        assert!(err.to_string().contains("unsupported"));
1185        assert!(err.to_string().contains("99"));
1186    }
1187
1188    #[test]
1189    fn should_reject_version_zero() {
1190        let mut buf = BytesMut::with_capacity(FOOTER_SIZE);
1191        buf.put_u32_le(0);
1192        buf.put_u64_le(0);
1193        buf.put_u64_le(0);
1194        buf.put_u16_le(0);
1195
1196        let err = Manifest::from_bytes(buf.freeze()).unwrap_err();
1197
1198        assert!(err.to_string().contains("unsupported"));
1199    }
1200
1201    #[test]
1202    fn should_make_appended_entry_accessible_via_iter() {
1203        let mut m = Manifest::empty();
1204
1205        m.append(&entry("loc", vec![meta(0, 42, "meta")])).unwrap();
1206
1207        let entries: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1208        assert_eq!(entries.len(), 1);
1209        assert_eq!(entries[0].sequence, 0);
1210        assert_eq!(entries[0].location, "loc");
1211        assert_eq!(entries[0].metadata, vec![meta(0, 42, "meta")]);
1212    }
1213
1214    #[test]
1215    fn should_append_to_existing_base_entries() {
1216        let base = Manifest::from_entries(&[entry_seq(0, "base", vec![])]);
1217        let data = base.to_bytes().unwrap();
1218        let mut m = Manifest::from_bytes(data).unwrap();
1219
1220        m.append(&entry("appended", vec![])).unwrap();
1221
1222        assert_eq!(m.entries_count(), 2);
1223        assert_eq!(collect_locations(&m), vec!["base", "appended"]);
1224        let entries: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1225        assert_eq!(entries[0].sequence, 0);
1226        assert_eq!(entries[1].sequence, 1);
1227    }
1228
1229    #[test]
1230    fn should_preserve_append_order() {
1231        let mut m = Manifest::empty();
1232
1233        m.append(&entry("a", vec![])).unwrap();
1234        m.append(&entry("b", vec![])).unwrap();
1235        m.append(&entry("c", vec![])).unwrap();
1236
1237        assert_eq!(collect_locations(&m), vec!["a", "b", "c"]);
1238        let entries: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1239        assert_eq!(entries[0].sequence, 0);
1240        assert_eq!(entries[1].sequence, 1);
1241        assert_eq!(entries[2].sequence, 2);
1242    }
1243
1244    #[test]
1245    fn should_handle_entry_with_empty_location() {
1246        let m = Manifest::from_entries(&[entry_seq(0, "", vec![])]);
1247
1248        let decoded: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1249        assert_eq!(decoded[0].location, "");
1250        assert!(decoded[0].metadata.is_empty());
1251    }
1252
1253    #[test]
1254    fn should_handle_entry_with_large_metadata() {
1255        let big_meta = Bytes::from(vec![0xAB_u8; 1024]);
1256
1257        let m = Manifest::from_entries(&[entry_seq(
1258            0,
1259            "loc",
1260            vec![Metadata {
1261                start_index: 0,
1262                ingestion_time_ms: 1,
1263                payload: big_meta.clone(),
1264            }],
1265        )]);
1266
1267        let decoded: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1268        assert_eq!(decoded[0].metadata.len(), 1);
1269        assert_eq!(decoded[0].metadata[0].payload, big_meta);
1270    }
1271
1272    #[test]
1273    fn should_handle_negative_ingestion_time() {
1274        let m = Manifest::from_entries(&[entry_seq(0, "loc", vec![meta(0, -1000, "")])]);
1275
1276        let decoded: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1277        assert_eq!(decoded[0].metadata[0].ingestion_time_ms, -1000);
1278    }
1279
1280    #[test]
1281    fn should_return_footer_for_empty_manifest() {
1282        let m = Manifest::empty();
1283
1284        let bytes = m.to_bytes().unwrap();
1285
1286        assert_eq!(bytes.len(), FOOTER_SIZE);
1287        assert_eq!(u32::from_le_bytes(bytes[0..4].try_into().unwrap()), 0);
1288        assert_eq!(u64::from_le_bytes(bytes[4..12].try_into().unwrap()), 0);
1289        assert_eq!(u64::from_le_bytes(bytes[12..20].try_into().unwrap()), 0);
1290        assert_eq!(
1291            u16::from_le_bytes(bytes[20..22].try_into().unwrap()),
1292            MANIFEST_VERSION
1293        );
1294    }
1295
1296    #[test]
1297    fn should_merge_base_and_appended() {
1298        let base = Manifest::from_entries(&[entry_seq(0, "base", vec![])]);
1299        let mut m = Manifest::from_bytes(base.to_bytes().unwrap()).unwrap();
1300        m.append(&entry("appended", vec![])).unwrap();
1301
1302        let serialized = m.to_bytes().unwrap();
1303        let reparsed = Manifest::from_bytes(serialized).unwrap();
1304
1305        assert_eq!(reparsed.entries_count(), 2);
1306        assert_eq!(collect_locations(&reparsed), vec!["base", "appended"]);
1307        let entries: Vec<QueueEntry> = reparsed.iter().map(|e| e.unwrap()).collect();
1308        assert_eq!(entries[0].sequence, 0);
1309        assert_eq!(entries[1].sequence, 1);
1310    }
1311
1312    #[test]
1313    fn should_write_correct_footer_count() {
1314        let base = Manifest::from_entries(&[entry_seq(0, "a", vec![]), entry_seq(1, "b", vec![])]);
1315        let mut m = Manifest::from_bytes(base.to_bytes().unwrap()).unwrap();
1316        m.append(&entry("c", vec![])).unwrap();
1317        m.append(&entry("d", vec![])).unwrap();
1318        m.append(&entry("e", vec![])).unwrap();
1319
1320        let bytes = m.to_bytes().unwrap();
1321
1322        let footer_start = bytes.len() - FOOTER_SIZE;
1323        let count = u32::from_le_bytes(bytes[footer_start..footer_start + 4].try_into().unwrap());
1324        let next_seq = u64::from_le_bytes(
1325            bytes[footer_start + 4..footer_start + 12]
1326                .try_into()
1327                .unwrap(),
1328        );
1329        let epoch = u64::from_le_bytes(
1330            bytes[footer_start + 12..footer_start + 20]
1331                .try_into()
1332                .unwrap(),
1333        );
1334        let version = u16::from_le_bytes(bytes[footer_start + 20..].try_into().unwrap());
1335        assert_eq!(count, 5);
1336        assert_eq!(next_seq, 5);
1337        assert_eq!(epoch, 0);
1338        assert_eq!(version, MANIFEST_VERSION);
1339    }
1340
1341    #[test]
1342    fn should_round_trip_from_entries_to_bytes_from_bytes() {
1343        let entries = vec![
1344            entry_seq(0, "a", vec![meta(0, 10, "m1")]),
1345            entry_seq(1, "b", vec![meta(0, 20, "m2")]),
1346        ];
1347        let original = Manifest::from_entries(&entries);
1348
1349        let reparsed = Manifest::from_bytes(original.to_bytes().unwrap()).unwrap();
1350
1351        assert_eq!(reparsed.entries_count(), 2);
1352        let decoded: Vec<QueueEntry> = reparsed.iter().map(|e| e.unwrap()).collect();
1353        assert_eq!(decoded[0].sequence, 0);
1354        assert_eq!(decoded[0].location, "a");
1355        assert_eq!(decoded[0].metadata, vec![meta(0, 10, "m1")]);
1356        assert_eq!(decoded[1].sequence, 1);
1357        assert_eq!(decoded[1].location, "b");
1358        assert_eq!(decoded[1].metadata, vec![meta(0, 20, "m2")]);
1359    }
1360
1361    #[test]
1362    fn should_round_trip_append_serialize_reparse() {
1363        let mut m = Manifest::empty();
1364        m.append(&entry("x", vec![meta(0, 100, "data")])).unwrap();
1365        m.append(&entry("y", vec![meta(0, 200, "more")])).unwrap();
1366
1367        let reparsed = Manifest::from_bytes(m.to_bytes().unwrap()).unwrap();
1368
1369        assert_eq!(reparsed.entries_count(), 2);
1370        assert_eq!(collect_locations(&reparsed), vec!["x", "y"]);
1371    }
1372
1373    #[test]
1374    fn should_chain_serialize_reparse_append() {
1375        let original = Manifest::from_entries(&[entry_seq(0, "a", vec![])]);
1376        let mut m = Manifest::from_bytes(original.to_bytes().unwrap()).unwrap();
1377        m.append(&entry("b", vec![])).unwrap();
1378
1379        let mut m2 = Manifest::from_bytes(m.to_bytes().unwrap()).unwrap();
1380        m2.append(&entry("c", vec![])).unwrap();
1381
1382        let final_m = Manifest::from_bytes(m2.to_bytes().unwrap()).unwrap();
1383
1384        assert_eq!(final_m.entries_count(), 3);
1385        assert_eq!(collect_locations(&final_m), vec!["a", "b", "c"]);
1386        let entries: Vec<QueueEntry> = final_m.iter().map(|e| e.unwrap()).collect();
1387        assert_eq!(entries[2].sequence, 2);
1388    }
1389
1390    #[test]
1391    fn should_dequeue_entries_through_sequence() {
1392        let mut m = Manifest::empty();
1393        for _ in 0..5 {
1394            m.append(&entry("loc", vec![])).unwrap();
1395        }
1396
1397        let removed = m.dequeue(2).unwrap();
1398
1399        assert_eq!(removed.len(), 3);
1400        assert_eq!(removed[0].sequence, 0);
1401        assert_eq!(removed[1].sequence, 1);
1402        assert_eq!(removed[2].sequence, 2);
1403        assert_eq!(m.entries_count(), 2);
1404        let remaining: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1405        assert_eq!(remaining[0].sequence, 3);
1406        assert_eq!(remaining[1].sequence, 4);
1407        assert_eq!(m.next_sequence, 5);
1408    }
1409
1410    #[test]
1411    fn should_dequeue_all_entries() {
1412        let mut m = Manifest::empty();
1413        for _ in 0..3 {
1414            m.append(&entry("loc", vec![])).unwrap();
1415        }
1416
1417        let removed = m.dequeue(2).unwrap();
1418
1419        assert_eq!(removed.len(), 3);
1420        assert!(m.is_empty());
1421        assert_eq!(m.next_sequence, 3);
1422    }
1423
1424    #[test]
1425    fn should_dequeue_nothing_when_sequence_below_first() {
1426        let entries = vec![
1427            entry_seq(5, "a", vec![]),
1428            entry_seq(6, "b", vec![]),
1429            entry_seq(7, "c", vec![]),
1430        ];
1431        let mut m = Manifest::from_entries(&entries);
1432
1433        let removed = m.dequeue(3).unwrap();
1434
1435        assert!(removed.is_empty());
1436        assert_eq!(m.entries_count(), 3);
1437    }
1438
1439    #[test]
1440    fn should_append_after_dequeue() {
1441        let mut m = Manifest::empty();
1442        for _ in 0..3 {
1443            m.append(&entry("loc", vec![])).unwrap();
1444        }
1445
1446        m.dequeue(0).unwrap();
1447
1448        assert_eq!(m.entries_count(), 2);
1449        let remaining: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1450        assert_eq!(remaining[0].sequence, 1);
1451        assert_eq!(remaining[1].sequence, 2);
1452
1453        m.append(&entry("new", vec![])).unwrap();
1454        let all: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1455        assert_eq!(all.len(), 3);
1456        assert_eq!(all[2].sequence, 3);
1457    }
1458
1459    /// Serialize a single entry into raw bytes (without footer).
1460    fn encode_entry_bytes(entry: &QueueEntry) -> Vec<u8> {
1461        let mut buf = BytesMut::new();
1462        Manifest::encode_entry(&mut buf, entry).unwrap();
1463        buf.to_vec()
1464    }
1465
1466    /// Wrap raw entry bytes with a manifest footer (entry_count=1) so that
1467    /// `Manifest::from_bytes` + `.iter()` exercises `decode_entry`.
1468    fn manifest_from_raw_entry(entry_bytes: &[u8]) -> Manifest {
1469        let mut buf = BytesMut::new();
1470        buf.extend_from_slice(entry_bytes);
1471        buf.put_u32_le(1); // entry_count
1472        buf.put_u64_le(1); // next_sequence
1473        buf.put_u64_le(0); // epoch
1474        buf.put_u16_le(MANIFEST_VERSION);
1475        Manifest::from_bytes(buf.freeze()).unwrap()
1476    }
1477
1478    /// Offset of metadata_count inside the encoded entry (for location "a").
1479    /// Layout: entry_len(4) + sequence(8) + location_len(2) + location(1)
1480    const METADATA_COUNT_OFFSET: usize = ENTRY_LEN_SIZE + SEQUENCE_SIZE + LOCATION_LEN_SIZE + 1;
1481
1482    /// Build an entry with no metadata, then corrupt metadata_count to `count`
1483    /// and extend the buffer with `extra_bytes` after metadata_count to simulate
1484    /// partial metadata. Also patches entry_len to match the new total size.
1485    fn corrupt_metadata_entry(count: u32, extra_bytes: &[u8]) -> Vec<u8> {
1486        let e = QueueEntry {
1487            sequence: 1,
1488            location: "a".to_string(),
1489            metadata: vec![],
1490        };
1491        let mut raw = encode_entry_bytes(&e);
1492        // Overwrite metadata_count
1493        raw[METADATA_COUNT_OFFSET..METADATA_COUNT_OFFSET + 4].copy_from_slice(&count.to_le_bytes());
1494        // Append extra bytes (partial metadata fields)
1495        raw.extend_from_slice(extra_bytes);
1496        // Patch entry_len to cover the full buffer after the 4-byte prefix
1497        let new_entry_len = (raw.len() - ENTRY_LEN_SIZE) as u32;
1498        raw[..ENTRY_LEN_SIZE].copy_from_slice(&new_entry_len.to_le_bytes());
1499        raw
1500    }
1501
1502    #[test]
1503    fn should_reject_trailing_bytes_before_footer() {
1504        // Build a valid entry, then add garbage bytes before the footer.
1505        let mut raw = encode_entry_bytes(&entry_seq(0, "loc", vec![]));
1506        raw.extend_from_slice(&[0xFFu8; 5]); // trailing garbage
1507
1508        let manifest = manifest_from_raw_entry(&raw);
1509        let items: Vec<Result<QueueEntry>> = manifest.iter().collect();
1510        assert_eq!(items.len(), 2);
1511        assert!(items[0].is_ok());
1512        let err = items[1].as_ref().unwrap_err();
1513        assert!(
1514            err.to_string().contains("did not consume all bytes"),
1515            "got: {}",
1516            err
1517        );
1518    }
1519
1520    #[test]
1521    fn should_reject_entry_with_entry_len_below_minimum() {
1522        // entry_len too small: less than SEQUENCE_SIZE + LOCATION_LEN_SIZE + METADATA_COUNT_SIZE (14)
1523        let bad_entry_len = (SEQUENCE_SIZE + LOCATION_LEN_SIZE + METADATA_COUNT_SIZE - 1) as u32;
1524        let mut raw = Vec::new();
1525        raw.extend_from_slice(&bad_entry_len.to_le_bytes());
1526        raw.extend_from_slice(&[0u8; 13]); // enough raw bytes to not truncate
1527
1528        let manifest = manifest_from_raw_entry(&raw);
1529        let err = manifest.iter().next().unwrap().unwrap_err();
1530        assert!(err.to_string().contains(
1531            "entry length 13 is less than minimum entry length 14 for the length of the location 0"
1532        ));
1533    }
1534
1535    #[test]
1536    fn should_reject_entry_with_entry_len_below_minimum_for_location() {
1537        let location = "abc";
1538        // entry_len covers fixed fields but not the full location
1539        let bad_entry_len =
1540            (SEQUENCE_SIZE + LOCATION_LEN_SIZE + METADATA_COUNT_SIZE + location.len() - 1) as u32;
1541        let mut raw = Vec::new();
1542        raw.extend_from_slice(&bad_entry_len.to_le_bytes());
1543        raw.extend_from_slice(&0u64.to_le_bytes()); // sequence
1544        raw.extend_from_slice(&(location.len() as u16).to_le_bytes()); // location_len
1545        raw.extend_from_slice(&[0u8; 20]); // padding so entry doesn't extend beyond data
1546
1547        let manifest = manifest_from_raw_entry(&raw);
1548        let err = manifest.iter().next().unwrap().unwrap_err();
1549        assert!(
1550            err.to_string()
1551                .contains("entry length 16 is less than minimum entry length 17"),
1552            "got: {}",
1553            err
1554        );
1555    }
1556
1557    #[test]
1558    fn should_reject_entry_truncated_before_entry_len() {
1559        // Data too short to even read entry_len (need 4 bytes, provide 2)
1560        let manifest = manifest_from_raw_entry(&[0u8; 2]);
1561        let err = manifest.iter().next().unwrap().unwrap_err();
1562        assert!(
1563            matches!(&err, Error::Serialization(msg) if msg.contains("entry length field does not fit")),
1564            "unexpected error: {err}"
1565        );
1566    }
1567
1568    #[test]
1569    fn should_reject_entry_truncated_before_metadata_start_index() {
1570        // metadata_count = 1 but no metadata bytes at all
1571        let manifest = manifest_from_raw_entry(&corrupt_metadata_entry(1, &[]));
1572        let err = manifest.iter().next().unwrap().unwrap_err();
1573        assert!(
1574            matches!(&err, Error::Serialization(msg) if msg.contains("start index field does not fit")),
1575            "unexpected error: {err}"
1576        );
1577    }
1578
1579    #[test]
1580    fn should_reject_entry_truncated_before_metadata_ingestion_time() {
1581        // metadata_count = 1, only start_index present (4 bytes)
1582        let manifest = manifest_from_raw_entry(&corrupt_metadata_entry(1, &0u32.to_le_bytes()));
1583        let err = manifest.iter().next().unwrap().unwrap_err();
1584        assert!(
1585            matches!(&err, Error::Serialization(msg) if msg.contains("ingestion time field does not fit")),
1586            "unexpected error: {err}"
1587        );
1588    }
1589
1590    #[test]
1591    fn should_reject_entry_truncated_before_metadata_length() {
1592        // metadata_count = 1, start_index + ingestion_time present, but no m_len
1593        let mut extra = Vec::new();
1594        extra.extend_from_slice(&0u32.to_le_bytes()); // start_index
1595        extra.extend_from_slice(&0i64.to_le_bytes()); // ingestion_time_ms
1596        let manifest = manifest_from_raw_entry(&corrupt_metadata_entry(1, &extra));
1597        let err = manifest.iter().next().unwrap().unwrap_err();
1598        assert!(
1599            matches!(&err, Error::Serialization(msg) if msg.contains("metadata length field does not fit")),
1600            "unexpected error: {err}"
1601        );
1602    }
1603
1604    #[test]
1605    fn should_reject_entry_truncated_before_metadata_payload() {
1606        // metadata_count = 1, all fixed fields present, m_len says 10 but only 2 bytes follow
1607        let mut extra = Vec::new();
1608        extra.extend_from_slice(&0u32.to_le_bytes()); // start_index
1609        extra.extend_from_slice(&0i64.to_le_bytes()); // ingestion_time_ms
1610        extra.extend_from_slice(&10u32.to_le_bytes()); // m_len = 10
1611        extra.extend_from_slice(&[0xAB, 0xCD]); // only 2 payload bytes
1612        let manifest = manifest_from_raw_entry(&corrupt_metadata_entry(1, &extra));
1613        let err = manifest.iter().next().unwrap().unwrap_err();
1614        assert!(
1615            matches!(&err, Error::Serialization(msg) if msg.contains("metadata has less bytes than set")),
1616            "unexpected error: {err}"
1617        );
1618    }
1619
1620    #[tokio::test]
1621    async fn should_reject_location_exceeding_u16_max() {
1622        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1623        let producer =
1624            QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1625
1626        let long_location = "x".repeat(u16::MAX as usize + 1);
1627        let result = producer.enqueue(long_location, vec![]).await;
1628        assert!(matches!(result, Err(Error::InvalidInput(msg)) if msg.contains("location length")));
1629    }
1630}