1use std::sync::Arc;
2use std::sync::atomic::{AtomicU64, Ordering};
3
4use bytes::{BufMut, Bytes, BytesMut};
5use slatedb::object_store::path::Path;
6use slatedb::object_store::{
7 Error as ObjectStoreError, ObjectStore, PutMode, PutPayload, UpdateVersion,
8};
9
10use crate::error::{Error, Result};
11
12const MANIFEST_VERSION: u16 = 1;
13const UNINITIALIZED_EPOCH: u64 = u64::MAX;
14const ENTRY_LEN_SIZE: usize = 4;
15const LOCATION_LEN_SIZE: usize = 2;
16const INGESTION_TIME_MS_SIZE: usize = 8;
17const METADATA_LEN_SIZE: usize = 4;
18const START_INDEX_SIZE: usize = 4;
19const METADATA_COUNT_SIZE: usize = 4;
20const ENTRIES_COUNT_SIZE: usize = 4;
21const SEQUENCE_SIZE: usize = 8;
22const EPOCH_SIZE: usize = 8;
23const VERSION_SIZE: usize = 2;
24const FOOTER_SIZE: usize = ENTRIES_COUNT_SIZE + SEQUENCE_SIZE + EPOCH_SIZE + VERSION_SIZE;
25
26#[derive(Debug, Clone, PartialEq)]
28pub struct Metadata {
29 pub start_index: u32,
31 pub ingestion_time_ms: i64,
33 pub payload: Bytes,
35}
36
37#[derive(Debug, Clone)]
38pub(crate) struct QueueEntry {
39 pub(crate) sequence: u64,
40 pub(crate) location: String,
41 pub(crate) metadata: Vec<Metadata>,
42}
43
44impl QueueEntry {
45 fn new(location: String, metadata: Vec<Metadata>) -> Result<Self> {
46 if location.len() > u16::MAX as usize {
47 return Err(Error::InvalidInput(format!(
48 "location length {} exceeds u16::MAX",
49 location.len()
50 )));
51 }
52 if metadata.len() > u32::MAX as usize {
53 return Err(Error::InvalidInput(format!(
54 "metadata count {} exceeds u32::MAX",
55 metadata.len()
56 )));
57 }
58 Ok(Self {
59 sequence: 0,
60 location,
61 metadata,
62 })
63 }
64
65 fn clone_with_sequence(&self, sequence: u64) -> Self {
66 Self {
67 sequence,
68 ..self.clone()
69 }
70 }
71}
72
73#[derive(Debug, Clone)]
74pub(crate) struct Manifest {
75 data: Bytes,
76 appended: BytesMut,
77 appended_count: usize,
78 next_sequence: u64,
79 epoch: u64,
80}
81
82impl Manifest {
83 fn empty() -> Self {
85 let mut buf = BytesMut::with_capacity(FOOTER_SIZE);
86 buf.put_u32_le(0);
87 buf.put_u64_le(0);
88 buf.put_u64_le(0);
89 buf.put_u16_le(MANIFEST_VERSION);
90 Self {
91 data: buf.freeze(),
92 appended: BytesMut::new(),
93 appended_count: 0,
94 next_sequence: 0,
95 epoch: 0,
96 }
97 }
98
99 pub(crate) fn from_bytes(data: Bytes) -> Result<Self> {
101 if data.is_empty() {
102 return Err(Error::Serialization(
103 "queue manifest data must not be empty".to_string(),
104 ));
105 }
106 if data.len() < FOOTER_SIZE {
107 return Err(Error::Serialization(
108 "queue manifest too short for footer".to_string(),
109 ));
110 }
111 let version_start = data.len() - VERSION_SIZE;
112 let version = u16::from_le_bytes(data[version_start..].try_into().unwrap());
113 if version != MANIFEST_VERSION {
114 return Err(Error::Serialization(format!(
115 "unsupported queue manifest version: {}",
116 version
117 )));
118 }
119 let epoch_start = data.len() - VERSION_SIZE - EPOCH_SIZE;
120 let epoch = u64::from_le_bytes(
121 data[epoch_start..epoch_start + EPOCH_SIZE]
122 .try_into()
123 .unwrap(),
124 );
125 let next_seq_start = data.len() - VERSION_SIZE - EPOCH_SIZE - SEQUENCE_SIZE;
126 let next_sequence = u64::from_le_bytes(
127 data[next_seq_start..next_seq_start + SEQUENCE_SIZE]
128 .try_into()
129 .unwrap(),
130 );
131 Ok(Self {
132 data,
133 appended: BytesMut::new(),
134 appended_count: 0,
135 next_sequence,
136 epoch,
137 })
138 }
139
140 #[cfg(test)]
142 fn from_entries(entries: &[QueueEntry]) -> Self {
143 let next_sequence = entries.iter().map(|e| e.sequence + 1).max().unwrap_or(0);
144 let mut buf = BytesMut::new();
145 for entry in entries {
146 Self::encode_entry(&mut buf, entry).unwrap();
147 }
148 buf.put_u32_le(entries.len() as u32);
149 buf.put_u64_le(next_sequence);
150 buf.put_u64_le(0);
151 buf.put_u16_le(MANIFEST_VERSION);
152 Self {
153 data: buf.freeze(),
154 appended: BytesMut::new(),
155 appended_count: 0,
156 next_sequence,
157 epoch: 0,
158 }
159 }
160
161 fn entries_count(&self) -> usize {
163 let base = self.existing_entries_count();
164 base + self.appended_count
165 }
166
167 #[cfg(test)]
169 fn is_empty(&self) -> bool {
170 self.entries_count() == 0
171 }
172
173 pub(crate) fn iter(&self) -> ManifestIter<'_> {
175 let base_count = self.existing_entries_count();
176 let entries_end = if self.data.is_empty() {
177 0
178 } else {
179 self.data.len() - FOOTER_SIZE
180 };
181 ManifestIter {
182 data: &self.data,
183 offset: 0,
184 remaining: base_count,
185 entries_end,
186 appended: &self.appended,
187 appended_offset: 0,
188 appended_remaining: self.appended_count,
189 }
190 }
191
192 fn existing_entries_count(&self) -> usize {
193 if self.data.is_empty() {
194 0
195 } else {
196 let footer_start = self.data.len() - FOOTER_SIZE;
197 u32::from_le_bytes(
198 self.data[footer_start..footer_start + ENTRIES_COUNT_SIZE]
199 .try_into()
200 .unwrap(),
201 ) as usize
202 }
203 }
204
205 fn append(&mut self, entry: &QueueEntry) -> Result<()> {
209 let sequenced = entry.clone_with_sequence(self.next_sequence);
210 Self::encode_entry(&mut self.appended, &sequenced)?;
211 self.next_sequence += 1;
212 self.appended_count += 1;
213 Ok(())
214 }
215
216 fn dequeue(&mut self, through_sequence: u64) -> Result<Vec<QueueEntry>> {
221 let next_seq = self.next_sequence;
222 let epoch = self.epoch;
223
224 let base_count = self.existing_entries_count();
225 let entries_end = if self.data.is_empty() {
226 0
227 } else {
228 self.data.len() - FOOTER_SIZE
229 };
230
231 let (mut removed, remaining_base_start, remaining_base_count) =
232 split_entries(&self.data, base_count, entries_end, through_sequence)?;
233
234 let appended_end = self.appended.len();
235 let (appended_removed, remaining_appended_start, remaining_appended_count) = split_entries(
236 &self.appended,
237 self.appended_count,
238 appended_end,
239 through_sequence,
240 )?;
241 removed.extend(appended_removed);
242
243 let remaining_base_bytes = &self.data[remaining_base_start..entries_end];
244 let remaining_appended_bytes = &self.appended[remaining_appended_start..appended_end];
245 let total_remaining = remaining_base_count + remaining_appended_count;
246
247 let mut buf = BytesMut::with_capacity(
248 remaining_base_bytes.len() + remaining_appended_bytes.len() + FOOTER_SIZE,
249 );
250 buf.extend_from_slice(remaining_base_bytes);
251 buf.extend_from_slice(remaining_appended_bytes);
252 buf.put_u32_le(total_remaining);
253 buf.put_u64_le(next_seq);
254 buf.put_u64_le(epoch);
255 buf.put_u16_le(MANIFEST_VERSION);
256
257 self.data = buf.freeze();
258 self.appended = BytesMut::new();
259 self.appended_count = 0;
260 self.next_sequence = next_seq;
261 self.epoch = epoch;
262
263 Ok(removed)
264 }
265
266 fn set_epoch(&mut self, epoch: u64) {
268 self.epoch = epoch;
269 let mut buf = BytesMut::from(self.data.as_ref());
270 let epoch_start = buf.len() - VERSION_SIZE - EPOCH_SIZE;
271 buf[epoch_start..epoch_start + EPOCH_SIZE].copy_from_slice(&epoch.to_le_bytes());
272 self.data = buf.freeze();
273 }
274
275 fn to_bytes(&self) -> Result<Bytes> {
279 if self.appended.is_empty() {
280 return Ok(self.data.clone());
281 }
282 let (prefix, base_count) = if self.data.is_empty() {
283 (&[] as &[u8], 0u32)
284 } else {
285 let footer_start = self.data.len() - FOOTER_SIZE;
286 let count = u32::from_le_bytes(
287 self.data[footer_start..footer_start + ENTRIES_COUNT_SIZE]
288 .try_into()
289 .unwrap(),
290 );
291 (&self.data[..footer_start], count)
292 };
293 let total_count: u32 = base_count
294 .checked_add(self.appended_count as u32)
295 .ok_or_else(|| {
296 Error::Serialization(format!(
297 "total entry count consisting of {} existing entries + {} appended entries exceeds u32::MAX",
298 base_count, self.appended_count
299 ))
300 })?;
301 let mut buf = BytesMut::with_capacity(prefix.len() + self.appended.len() + FOOTER_SIZE);
302 buf.extend_from_slice(prefix);
303 buf.extend_from_slice(&self.appended);
304 buf.put_u32_le(total_count);
305 buf.put_u64_le(self.next_sequence);
306 buf.put_u64_le(self.epoch);
307 buf.put_u16_le(MANIFEST_VERSION);
308 Ok(buf.freeze())
309 }
310
311 fn encode_entry(buf: &mut BytesMut, entry: &QueueEntry) -> Result<()> {
312 debug_assert!(entry.location.len() <= u16::MAX as usize);
313 let metadata_size: usize = METADATA_COUNT_SIZE
314 + entry
315 .metadata
316 .iter()
317 .map(|m| {
318 START_INDEX_SIZE + INGESTION_TIME_MS_SIZE + METADATA_LEN_SIZE + m.payload.len()
319 })
320 .sum::<usize>();
321 let entry_body_len =
322 SEQUENCE_SIZE + LOCATION_LEN_SIZE + entry.location.len() + metadata_size;
323 debug_assert!(entry_body_len <= u32::MAX as usize);
324 buf.put_u32_le(entry_body_len as u32);
325 buf.put_u64_le(entry.sequence);
326 buf.put_u16_le(entry.location.len() as u16);
327 buf.extend_from_slice(entry.location.as_bytes());
328 debug_assert!(entry.metadata.len() <= u32::MAX as usize);
329 buf.put_u32_le(entry.metadata.len() as u32);
330 for m in &entry.metadata {
331 if m.payload.len() > u32::MAX as usize {
332 return Err(Error::InvalidInput(format!(
333 "metadata payload size {} exceeds u32::MAX",
334 m.payload.len()
335 )));
336 }
337 buf.put_u32_le(m.start_index);
338 buf.put_i64_le(m.ingestion_time_ms);
339 buf.put_u32_le(m.payload.len() as u32);
340 buf.extend_from_slice(&m.payload);
341 }
342 Ok(())
343 }
344}
345
346fn split_entries(
350 data: &[u8],
351 count: usize,
352 end: usize,
353 through_sequence: u64,
354) -> Result<(Vec<QueueEntry>, usize, u32)> {
355 let mut removed = Vec::new();
356 let mut offset = 0usize;
357
358 for i in 0..count {
359 let entry_start = offset;
360 let entry = decode_entry(data, &mut offset, end)?;
361
362 if entry.sequence <= through_sequence {
363 removed.push(entry);
364 } else {
365 return Ok((removed, entry_start, (count - i) as u32));
366 }
367 }
368
369 Ok((removed, end, 0))
370}
371
372fn decode_entry(data: &[u8], offset: &mut usize, end: usize) -> Result<QueueEntry> {
374 if *offset + ENTRY_LEN_SIZE > end {
375 return Err(Error::Serialization(
376 "queue entry corrupt: size of entry length field does not fit in entry".to_string(),
377 ));
378 }
379
380 let entry_len =
381 u32::from_le_bytes(data[*offset..*offset + ENTRY_LEN_SIZE].try_into().unwrap()) as usize;
382 *offset += ENTRY_LEN_SIZE;
383
384 if *offset + entry_len > end {
385 return Err(Error::Serialization(
386 "queue entry corrupt: entry has less bytes than set in the entry length".to_string(),
387 ));
388 }
389
390 let entry_end = *offset + entry_len;
391
392 let sequence = u64::from_le_bytes(data[*offset..*offset + SEQUENCE_SIZE].try_into().unwrap());
393 *offset += SEQUENCE_SIZE;
394
395 let location_len = u16::from_le_bytes(
396 data[*offset..*offset + LOCATION_LEN_SIZE]
397 .try_into()
398 .unwrap(),
399 ) as usize;
400 *offset += LOCATION_LEN_SIZE;
401
402 let min_entry_len = SEQUENCE_SIZE + LOCATION_LEN_SIZE + location_len + METADATA_COUNT_SIZE;
403 if entry_len < min_entry_len {
404 return Err(Error::Serialization(format!(
405 "queue entry corrupt: entry length {} is less than minimum entry length {} for the length of the location {}",
406 entry_len, min_entry_len, location_len
407 )));
408 }
409
410 let location = String::from_utf8(data[*offset..*offset + location_len].to_vec())
411 .map_err(|e| Error::Serialization(e.to_string()))?;
412 *offset += location_len;
413
414 let metadata_count = u32::from_le_bytes(
415 data[*offset..*offset + METADATA_COUNT_SIZE]
416 .try_into()
417 .unwrap(),
418 ) as usize;
419 *offset += METADATA_COUNT_SIZE;
420
421 let mut metadata = Vec::with_capacity(metadata_count);
422 for _ in 0..metadata_count {
423 if *offset + START_INDEX_SIZE > end {
424 return Err(Error::Serialization(
425 "queue entry corrupt: size of start index field does not fit in entry".to_string(),
426 ));
427 }
428 let start_index = u32::from_le_bytes(
429 data[*offset..*offset + START_INDEX_SIZE]
430 .try_into()
431 .unwrap(),
432 );
433 *offset += START_INDEX_SIZE;
434
435 if *offset + INGESTION_TIME_MS_SIZE > end {
436 return Err(Error::Serialization(
437 "queue entry corrupt: size of ingestion time field does not fit in entry"
438 .to_string(),
439 ));
440 }
441 let ingestion_time_ms = i64::from_le_bytes(
442 data[*offset..*offset + INGESTION_TIME_MS_SIZE]
443 .try_into()
444 .unwrap(),
445 );
446 *offset += INGESTION_TIME_MS_SIZE;
447
448 if *offset + METADATA_LEN_SIZE > end {
449 return Err(Error::Serialization(
450 "queue entry corrupt: size of metadata length field does not fit in entry"
451 .to_string(),
452 ));
453 }
454 let m_len = u32::from_le_bytes(
455 data[*offset..*offset + METADATA_LEN_SIZE]
456 .try_into()
457 .unwrap(),
458 ) as usize;
459 *offset += METADATA_LEN_SIZE;
460
461 if *offset + m_len > end {
462 return Err(Error::Serialization(
463 "queue entry corrupt: metadata has less bytes than set in the metadata length"
464 .to_string(),
465 ));
466 }
467 metadata.push(Metadata {
468 start_index,
469 ingestion_time_ms,
470 payload: Bytes::copy_from_slice(&data[*offset..*offset + m_len]),
471 });
472 *offset += m_len;
473 }
474
475 *offset = entry_end;
476
477 Ok(QueueEntry {
478 sequence,
479 location,
480 metadata,
481 })
482}
483
484pub(crate) struct ManifestIter<'a> {
486 data: &'a [u8],
487 offset: usize,
488 remaining: usize,
489 entries_end: usize,
490 appended: &'a [u8],
491 appended_offset: usize,
492 appended_remaining: usize,
493}
494
495impl Iterator for ManifestIter<'_> {
496 type Item = Result<QueueEntry>;
497
498 fn next(&mut self) -> Option<Self::Item> {
499 if self.remaining > 0 {
500 self.remaining -= 1;
501 Some(decode_entry(self.data, &mut self.offset, self.entries_end))
502 } else if self.appended_remaining > 0 {
503 self.appended_remaining -= 1;
504 Some(decode_entry(
505 self.appended,
506 &mut self.appended_offset,
507 self.appended.len(),
508 ))
509 } else if self.offset != self.entries_end {
510 let err = Some(Err(Error::Serialization(format!(
511 "base entries did not consume all bytes: offset {} != entries_end {}",
512 self.offset, self.entries_end
513 ))));
514 self.offset = self.entries_end;
515 err
516 } else {
517 None
518 }
519 }
520}
521
522enum ManifestWriteError {
523 Conflict,
524 Fatal(Error),
525}
526
527#[derive(Clone)]
528struct ManifestStore {
529 object_store: Arc<dyn ObjectStore>,
530 manifest_path: String,
531}
532
533impl ManifestStore {
534 async fn read(&self) -> Result<(Manifest, Option<UpdateVersion>)> {
535 let path = Path::from(self.manifest_path.as_str());
536 match self.object_store.get(&path).await {
537 Ok(result) => {
538 let version = UpdateVersion {
539 e_tag: result.meta.e_tag.clone(),
540 version: result.meta.version.clone(),
541 };
542 let bytes = result
543 .bytes()
544 .await
545 .map_err(|e| Error::Storage(e.to_string()))?;
546 let manifest = Manifest::from_bytes(bytes)?;
547 Ok((manifest, Some(version)))
548 }
549 Err(ObjectStoreError::NotFound { .. }) => Ok((Manifest::empty(), None)),
550 Err(e) => Err(Error::Storage(e.to_string())),
551 }
552 }
553
554 async fn write(
555 &self,
556 manifest: &Manifest,
557 version: Option<UpdateVersion>,
558 ) -> std::result::Result<(), ManifestWriteError> {
559 let path = Path::from(self.manifest_path.as_str());
560 let put_mode = match version {
561 Some(v) => PutMode::Update(v),
562 None => PutMode::Create,
563 };
564 let data = manifest.to_bytes().map_err(ManifestWriteError::Fatal)?;
565
566 match self
567 .object_store
568 .put_opts(&path, PutPayload::from(data.to_vec()), put_mode.into())
569 .await
570 {
571 Ok(_) => Ok(()),
572 Err(ObjectStoreError::Precondition { .. })
573 | Err(ObjectStoreError::AlreadyExists { .. }) => Err(ManifestWriteError::Conflict),
574 Err(e) => Err(ManifestWriteError::Fatal(Error::Storage(e.to_string()))),
575 }
576 }
577}
578
579struct ConflictCounter {
580 write_count: AtomicU64,
581 conflict_count: AtomicU64,
582}
583
584impl ConflictCounter {
585 fn new() -> Self {
586 Self {
587 write_count: AtomicU64::new(0),
588 conflict_count: AtomicU64::new(0),
589 }
590 }
591
592 fn record_write(&self) {
593 self.write_count.fetch_add(1, Ordering::Relaxed);
594 }
595
596 fn record_conflict(&self) {
597 self.conflict_count.fetch_add(1, Ordering::Relaxed);
598 }
599
600 fn conflict_rate(&self) -> f64 {
601 let writes = self.write_count.load(Ordering::Relaxed);
602 if writes == 0 {
603 return 0.0;
604 }
605 let conflicts = self.conflict_count.load(Ordering::Relaxed);
606 let rate = (conflicts as f64 / writes as f64) * 100.0;
607 rate.min(100.0)
608 }
609}
610
611pub struct QueueProducer {
617 manifest_store: ManifestStore,
618 counter: ConflictCounter,
619}
620
621impl QueueProducer {
622 pub fn with_object_store(manifest_path: String, object_store: Arc<dyn ObjectStore>) -> Self {
624 Self {
625 manifest_store: ManifestStore {
626 object_store,
627 manifest_path,
628 },
629 counter: ConflictCounter::new(),
630 }
631 }
632
633 pub async fn enqueue(&self, location: String, metadata: Vec<Metadata>) -> Result<()> {
639 let entry = QueueEntry::new(location, metadata)?;
640 loop {
641 let (mut manifest, version) = self.manifest_store.read().await?;
642 manifest.append(&entry)?;
643 self.counter.record_write();
644 match self.manifest_store.write(&manifest, version).await {
645 Ok(()) => return Ok(()),
646 Err(ManifestWriteError::Conflict) => {
647 self.counter.record_conflict();
648 continue;
649 }
650 Err(ManifestWriteError::Fatal(e)) => return Err(e),
651 }
652 }
653 }
654
655 pub fn conflict_rate(&self) -> f64 {
657 self.counter.conflict_rate()
658 }
659}
660
661pub struct QueueConsumer {
670 manifest_store: ManifestStore,
671 epoch: AtomicU64,
672 counter: ConflictCounter,
673 queue_len: AtomicU64,
674}
675
676impl QueueConsumer {
677 pub fn with_object_store(manifest_path: String, object_store: Arc<dyn ObjectStore>) -> Self {
681 Self {
682 manifest_store: ManifestStore {
683 object_store,
684 manifest_path,
685 },
686 epoch: AtomicU64::new(UNINITIALIZED_EPOCH),
687 counter: ConflictCounter::new(),
688 queue_len: AtomicU64::new(0),
689 }
690 }
691
692 pub async fn initialize(&self) -> Result<()> {
695 loop {
696 let (mut manifest, version) = self.read_manifest().await?;
697 let mut new_epoch = manifest.epoch.wrapping_add(1);
698 if new_epoch == UNINITIALIZED_EPOCH {
699 new_epoch = new_epoch.wrapping_add(1);
700 }
701 manifest.set_epoch(new_epoch);
702 match self.write_manifest(&manifest, version).await {
703 Ok(()) => {
704 self.epoch.store(new_epoch, Ordering::Relaxed);
705 return Ok(());
706 }
707 Err(ManifestWriteError::Conflict) => {
708 self.counter.record_conflict();
709 continue;
710 }
711 Err(ManifestWriteError::Fatal(e)) => return Err(e),
712 }
713 }
714 }
715
716 pub(crate) async fn peek(&self) -> Result<Option<QueueEntry>> {
719 let (manifest, _) = self.read_manifest().await?;
720 if manifest.epoch != self.epoch.load(Ordering::Relaxed) {
721 return Err(Error::Fenced);
722 }
723 manifest.iter().next().transpose()
724 }
725
726 pub(crate) async fn read(&self, sequence: u64) -> Result<Option<QueueEntry>> {
729 let (manifest, _) = self.read_manifest().await?;
730 if manifest.epoch != self.epoch.load(Ordering::Relaxed) {
731 return Err(Error::Fenced);
732 }
733 manifest
734 .iter()
735 .find(|e| matches!(e, Ok(e) if e.sequence == sequence))
736 .transpose()
737 }
738
739 pub(crate) async fn dequeue(&self, through_sequence: u64) -> Result<Vec<QueueEntry>> {
742 loop {
743 let (mut manifest, version) = self.read_manifest().await?;
744 if manifest.epoch != self.epoch.load(Ordering::Relaxed) {
745 return Err(Error::Fenced);
746 }
747 let removed = manifest.dequeue(through_sequence)?;
748 match self.write_manifest(&manifest, version).await {
749 Ok(()) => return Ok(removed),
750 Err(ManifestWriteError::Conflict) => {
751 self.counter.record_conflict();
752 continue;
753 }
754 Err(ManifestWriteError::Fatal(e)) => return Err(e),
755 }
756 }
757 }
758
759 pub fn len(&self) -> usize {
761 self.queue_len.load(Ordering::Relaxed) as usize
762 }
763
764 async fn read_manifest(&self) -> Result<(Manifest, Option<UpdateVersion>)> {
765 let result = self.manifest_store.read().await?;
766 self.queue_len
767 .store(result.0.entries_count() as u64, Ordering::Relaxed);
768 Ok(result)
769 }
770
771 async fn write_manifest(
772 &self,
773 manifest: &Manifest,
774 version: Option<UpdateVersion>,
775 ) -> std::result::Result<(), ManifestWriteError> {
776 self.counter.record_write();
777 let result = self.manifest_store.write(manifest, version).await;
778 if result.is_ok() {
779 self.queue_len
780 .store(manifest.entries_count() as u64, Ordering::Relaxed);
781 }
782 result
783 }
784
785 pub fn conflict_rate(&self) -> f64 {
787 self.counter.conflict_rate()
788 }
789}
790
791#[cfg(test)]
792mod tests {
793 use super::*;
794 use slatedb::object_store::memory::InMemory;
795
796 const TEST_MANIFEST_PATH: &str = "test/manifest";
797
798 async fn read_producer_manifest(store: &Arc<dyn ObjectStore>, path: &str) -> Manifest {
799 let path = Path::from(path);
800 let result = store.get(&path).await.unwrap();
801 let bytes = result.bytes().await.unwrap();
802 Manifest::from_bytes(bytes).unwrap()
803 }
804
805 #[tokio::test]
806 async fn should_initialize_consumer_and_increment_epoch() {
807 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
808 let consumer =
809 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
810
811 consumer.initialize().await.unwrap();
812
813 let manifest = read_producer_manifest(&store, TEST_MANIFEST_PATH).await;
814 assert_eq!(manifest.epoch, 1);
815 }
816
817 #[tokio::test]
818 async fn should_peek_none_when_queue_is_empty() {
819 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
820 let consumer =
821 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
822 consumer.initialize().await.unwrap();
823
824 let result = consumer.peek().await.unwrap();
825 assert!(result.is_none());
826 }
827
828 #[tokio::test]
829 async fn should_read_entry_by_sequence() {
830 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
831 let producer =
832 QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
833
834 producer
835 .enqueue("a.batch".to_string(), vec![])
836 .await
837 .unwrap();
838 producer
839 .enqueue("b.batch".to_string(), vec![])
840 .await
841 .unwrap();
842 producer
843 .enqueue("c.batch".to_string(), vec![])
844 .await
845 .unwrap();
846
847 let consumer =
848 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
849 consumer.initialize().await.unwrap();
850
851 let entry = consumer.read(1).await.unwrap().unwrap();
852 assert_eq!(entry.location, "b.batch");
853 assert_eq!(entry.sequence, 1);
854 }
855
856 #[tokio::test]
857 async fn should_read_none_for_missing_sequence() {
858 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
859 let producer =
860 QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
861
862 producer
863 .enqueue("a.batch".to_string(), vec![])
864 .await
865 .unwrap();
866
867 let consumer =
868 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
869 consumer.initialize().await.unwrap();
870
871 let result = consumer.read(99).await.unwrap();
872 assert!(result.is_none());
873 }
874
875 #[tokio::test]
876 async fn should_fence_old_consumer_on_peek() {
877 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
878 let consumer_a =
879 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
880 consumer_a.initialize().await.unwrap();
881
882 let consumer_b =
883 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
884 consumer_b.initialize().await.unwrap();
885
886 let result = consumer_a.peek().await;
887 assert!(matches!(result, Err(Error::Fenced)));
888 }
889
890 #[tokio::test]
891 async fn should_fence_old_consumer_on_dequeue() {
892 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
893 let consumer_a =
894 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
895 consumer_a.initialize().await.unwrap();
896
897 let consumer_b =
898 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
899 consumer_b.initialize().await.unwrap();
900
901 let result = consumer_a.dequeue(0).await;
902 assert!(matches!(result, Err(Error::Fenced)));
903 }
904
905 #[tokio::test]
906 async fn should_fence_uninitialized_consumer() {
907 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
908 let producer =
909 QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
910
911 producer
912 .enqueue("a.batch".to_string(), vec![])
913 .await
914 .unwrap();
915
916 let consumer =
917 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
918
919 let result = consumer.peek().await;
920 assert!(matches!(result, Err(Error::Fenced)));
921 }
922
923 #[tokio::test]
924 async fn should_wrap_epoch_to_zero_at_max() {
925 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
926
927 let mut manifest = Manifest::empty();
928 manifest.set_epoch(u64::MAX - 1);
929 let path = Path::from(TEST_MANIFEST_PATH);
930 store
931 .put(
932 &path,
933 PutPayload::from(manifest.to_bytes().unwrap().to_vec()),
934 )
935 .await
936 .unwrap();
937
938 let consumer =
939 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
940 consumer.initialize().await.unwrap();
941
942 let manifest = read_producer_manifest(&store, TEST_MANIFEST_PATH).await;
943 assert_eq!(manifest.epoch, 0);
944 }
945
946 #[tokio::test]
947 async fn should_peek_first_entry_with_valid_epoch() {
948 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
949 let producer =
950 QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
951
952 producer
953 .enqueue("a.batch".to_string(), vec![])
954 .await
955 .unwrap();
956 producer
957 .enqueue("b.batch".to_string(), vec![])
958 .await
959 .unwrap();
960
961 let consumer =
962 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
963 consumer.initialize().await.unwrap();
964
965 let entry = consumer.peek().await.unwrap().unwrap();
966 assert_eq!(entry.location, "a.batch");
967 }
968
969 #[tokio::test]
970 async fn should_dequeue_entries_with_valid_epoch() {
971 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
972 let producer =
973 QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
974
975 producer
976 .enqueue("a.batch".to_string(), vec![])
977 .await
978 .unwrap();
979 producer
980 .enqueue("b.batch".to_string(), vec![])
981 .await
982 .unwrap();
983 producer
984 .enqueue("c.batch".to_string(), vec![])
985 .await
986 .unwrap();
987
988 let consumer =
989 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
990 consumer.initialize().await.unwrap();
991
992 let removed = consumer.dequeue(1).await.unwrap();
993 assert_eq!(removed.len(), 2);
994 assert_eq!(removed[0].location, "a.batch");
995 assert_eq!(removed[1].location, "b.batch");
996
997 let next = consumer.peek().await.unwrap().unwrap();
998 assert_eq!(next.location, "c.batch");
999 }
1000
1001 #[tokio::test]
1002 async fn should_enqueue_after_consumer_dequeue() {
1003 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1004 let producer =
1005 QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1006
1007 producer
1008 .enqueue("a.batch".to_string(), vec![])
1009 .await
1010 .unwrap();
1011 producer
1012 .enqueue("b.batch".to_string(), vec![])
1013 .await
1014 .unwrap();
1015
1016 let consumer =
1017 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1018 consumer.initialize().await.unwrap();
1019
1020 consumer.dequeue(1).await.unwrap();
1021
1022 producer
1023 .enqueue("c.batch".to_string(), vec![])
1024 .await
1025 .unwrap();
1026
1027 let next = consumer.peek().await.unwrap().unwrap();
1028 assert_eq!(next.location, "c.batch");
1029 assert_eq!(next.sequence, 2);
1030 }
1031
1032 #[tokio::test]
1033 async fn should_enqueue_locations_to_manifest() {
1034 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1035 let producer =
1036 QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1037
1038 producer
1039 .enqueue("path/to/file1.batch".to_string(), vec![])
1040 .await
1041 .unwrap();
1042 producer
1043 .enqueue("path/to/file2.batch".to_string(), vec![])
1044 .await
1045 .unwrap();
1046
1047 let manifest = read_producer_manifest(&store, TEST_MANIFEST_PATH).await;
1048 let locations: Vec<String> = manifest.iter().map(|e| e.unwrap().location).collect();
1049 assert_eq!(
1050 locations,
1051 vec!["path/to/file1.batch", "path/to/file2.batch"]
1052 );
1053 }
1054
1055 #[tokio::test]
1056 async fn should_merge_with_existing_manifest() {
1057 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1058
1059 let existing = Manifest::from_entries(&[QueueEntry {
1060 sequence: 0,
1061 location: "existing/file.batch".to_string(),
1062 metadata: vec![],
1063 }]);
1064 let path = Path::from(TEST_MANIFEST_PATH);
1065 store
1066 .put(
1067 &path,
1068 PutPayload::from(existing.to_bytes().unwrap().to_vec()),
1069 )
1070 .await
1071 .unwrap();
1072
1073 let producer =
1074 QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1075 producer
1076 .enqueue("new/file.batch".to_string(), vec![])
1077 .await
1078 .unwrap();
1079
1080 let manifest = read_producer_manifest(&store, "test/manifest").await;
1081 let locations: Vec<String> = manifest.iter().map(|e| e.unwrap().location).collect();
1082 assert_eq!(locations, vec!["existing/file.batch", "new/file.batch"]);
1083 }
1084
1085 fn entry(location: &str, metadata: Vec<Metadata>) -> QueueEntry {
1086 QueueEntry::new(location.to_string(), metadata).unwrap()
1087 }
1088
1089 fn entry_seq(seq: u64, location: &str, metadata: Vec<Metadata>) -> QueueEntry {
1090 QueueEntry {
1091 sequence: seq,
1092 location: location.to_string(),
1093 metadata,
1094 }
1095 }
1096
1097 fn meta(start_index: u32, time_ms: i64, data: &str) -> Metadata {
1098 Metadata {
1099 start_index,
1100 ingestion_time_ms: time_ms,
1101 payload: Bytes::from(data.to_string()),
1102 }
1103 }
1104
1105 fn collect_locations(manifest: &Manifest) -> Vec<String> {
1106 manifest.iter().map(|e| e.unwrap().location).collect()
1107 }
1108
1109 #[test]
1110 fn should_create_empty_manifest() {
1111 let m = Manifest::empty();
1112
1113 assert_eq!(m.entries_count(), 0);
1114 assert!(m.is_empty());
1115 assert_eq!(m.epoch, 0);
1116
1117 let bytes = m.to_bytes().unwrap();
1118 assert_eq!(bytes.len(), FOOTER_SIZE);
1119 assert_eq!(u32::from_le_bytes(bytes[0..4].try_into().unwrap()), 0);
1120 assert_eq!(u64::from_le_bytes(bytes[4..12].try_into().unwrap()), 0);
1121 assert_eq!(u64::from_le_bytes(bytes[12..20].try_into().unwrap()), 0);
1122 assert_eq!(
1123 u16::from_le_bytes(bytes[20..22].try_into().unwrap()),
1124 MANIFEST_VERSION
1125 );
1126 }
1127
1128 #[test]
1129 fn should_parse_valid_manifest_bytes() {
1130 let entries = vec![
1131 entry_seq(0, "a", vec![meta(0, 1, "x")]),
1132 entry_seq(1, "b", vec![meta(0, 2, "y")]),
1133 ];
1134 let data = Manifest::from_entries(&entries).to_bytes().unwrap();
1135
1136 let m = Manifest::from_bytes(data).unwrap();
1137
1138 assert_eq!(m.entries_count(), 2);
1139 }
1140
1141 #[test]
1142 fn should_parse_footer_only_bytes() {
1143 let mut buf = BytesMut::with_capacity(FOOTER_SIZE);
1144 buf.put_u32_le(0);
1145 buf.put_u64_le(42);
1146 buf.put_u64_le(0);
1147 buf.put_u16_le(MANIFEST_VERSION);
1148
1149 let m = Manifest::from_bytes(buf.freeze()).unwrap();
1150
1151 assert_eq!(m.entries_count(), 0);
1152 assert_eq!(m.epoch, 0);
1153
1154 let mut m = m;
1155 m.append(&entry("loc", vec![])).unwrap();
1156 let entries: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1157 assert_eq!(entries[0].sequence, 42);
1158 }
1159
1160 #[test]
1161 fn should_reject_empty_bytes() {
1162 let err = Manifest::from_bytes(Bytes::new()).unwrap_err();
1163
1164 assert!(err.to_string().contains("must not be empty"));
1165 }
1166
1167 #[test]
1168 fn should_reject_bytes_too_short_for_footer() {
1169 let err = Manifest::from_bytes(Bytes::from_static(&[0; 21])).unwrap_err();
1170
1171 assert!(err.to_string().contains("too short for footer"));
1172 }
1173
1174 #[test]
1175 fn should_reject_wrong_version() {
1176 let mut buf = BytesMut::with_capacity(FOOTER_SIZE);
1177 buf.put_u32_le(0);
1178 buf.put_u64_le(0);
1179 buf.put_u64_le(0);
1180 buf.put_u16_le(99);
1181
1182 let err = Manifest::from_bytes(buf.freeze()).unwrap_err();
1183
1184 assert!(err.to_string().contains("unsupported"));
1185 assert!(err.to_string().contains("99"));
1186 }
1187
1188 #[test]
1189 fn should_reject_version_zero() {
1190 let mut buf = BytesMut::with_capacity(FOOTER_SIZE);
1191 buf.put_u32_le(0);
1192 buf.put_u64_le(0);
1193 buf.put_u64_le(0);
1194 buf.put_u16_le(0);
1195
1196 let err = Manifest::from_bytes(buf.freeze()).unwrap_err();
1197
1198 assert!(err.to_string().contains("unsupported"));
1199 }
1200
1201 #[test]
1202 fn should_make_appended_entry_accessible_via_iter() {
1203 let mut m = Manifest::empty();
1204
1205 m.append(&entry("loc", vec![meta(0, 42, "meta")])).unwrap();
1206
1207 let entries: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1208 assert_eq!(entries.len(), 1);
1209 assert_eq!(entries[0].sequence, 0);
1210 assert_eq!(entries[0].location, "loc");
1211 assert_eq!(entries[0].metadata, vec![meta(0, 42, "meta")]);
1212 }
1213
1214 #[test]
1215 fn should_append_to_existing_base_entries() {
1216 let base = Manifest::from_entries(&[entry_seq(0, "base", vec![])]);
1217 let data = base.to_bytes().unwrap();
1218 let mut m = Manifest::from_bytes(data).unwrap();
1219
1220 m.append(&entry("appended", vec![])).unwrap();
1221
1222 assert_eq!(m.entries_count(), 2);
1223 assert_eq!(collect_locations(&m), vec!["base", "appended"]);
1224 let entries: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1225 assert_eq!(entries[0].sequence, 0);
1226 assert_eq!(entries[1].sequence, 1);
1227 }
1228
1229 #[test]
1230 fn should_preserve_append_order() {
1231 let mut m = Manifest::empty();
1232
1233 m.append(&entry("a", vec![])).unwrap();
1234 m.append(&entry("b", vec![])).unwrap();
1235 m.append(&entry("c", vec![])).unwrap();
1236
1237 assert_eq!(collect_locations(&m), vec!["a", "b", "c"]);
1238 let entries: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1239 assert_eq!(entries[0].sequence, 0);
1240 assert_eq!(entries[1].sequence, 1);
1241 assert_eq!(entries[2].sequence, 2);
1242 }
1243
1244 #[test]
1245 fn should_handle_entry_with_empty_location() {
1246 let m = Manifest::from_entries(&[entry_seq(0, "", vec![])]);
1247
1248 let decoded: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1249 assert_eq!(decoded[0].location, "");
1250 assert!(decoded[0].metadata.is_empty());
1251 }
1252
1253 #[test]
1254 fn should_handle_entry_with_large_metadata() {
1255 let big_meta = Bytes::from(vec![0xAB_u8; 1024]);
1256
1257 let m = Manifest::from_entries(&[entry_seq(
1258 0,
1259 "loc",
1260 vec![Metadata {
1261 start_index: 0,
1262 ingestion_time_ms: 1,
1263 payload: big_meta.clone(),
1264 }],
1265 )]);
1266
1267 let decoded: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1268 assert_eq!(decoded[0].metadata.len(), 1);
1269 assert_eq!(decoded[0].metadata[0].payload, big_meta);
1270 }
1271
1272 #[test]
1273 fn should_handle_negative_ingestion_time() {
1274 let m = Manifest::from_entries(&[entry_seq(0, "loc", vec![meta(0, -1000, "")])]);
1275
1276 let decoded: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1277 assert_eq!(decoded[0].metadata[0].ingestion_time_ms, -1000);
1278 }
1279
1280 #[test]
1281 fn should_return_footer_for_empty_manifest() {
1282 let m = Manifest::empty();
1283
1284 let bytes = m.to_bytes().unwrap();
1285
1286 assert_eq!(bytes.len(), FOOTER_SIZE);
1287 assert_eq!(u32::from_le_bytes(bytes[0..4].try_into().unwrap()), 0);
1288 assert_eq!(u64::from_le_bytes(bytes[4..12].try_into().unwrap()), 0);
1289 assert_eq!(u64::from_le_bytes(bytes[12..20].try_into().unwrap()), 0);
1290 assert_eq!(
1291 u16::from_le_bytes(bytes[20..22].try_into().unwrap()),
1292 MANIFEST_VERSION
1293 );
1294 }
1295
1296 #[test]
1297 fn should_merge_base_and_appended() {
1298 let base = Manifest::from_entries(&[entry_seq(0, "base", vec![])]);
1299 let mut m = Manifest::from_bytes(base.to_bytes().unwrap()).unwrap();
1300 m.append(&entry("appended", vec![])).unwrap();
1301
1302 let serialized = m.to_bytes().unwrap();
1303 let reparsed = Manifest::from_bytes(serialized).unwrap();
1304
1305 assert_eq!(reparsed.entries_count(), 2);
1306 assert_eq!(collect_locations(&reparsed), vec!["base", "appended"]);
1307 let entries: Vec<QueueEntry> = reparsed.iter().map(|e| e.unwrap()).collect();
1308 assert_eq!(entries[0].sequence, 0);
1309 assert_eq!(entries[1].sequence, 1);
1310 }
1311
1312 #[test]
1313 fn should_write_correct_footer_count() {
1314 let base = Manifest::from_entries(&[entry_seq(0, "a", vec![]), entry_seq(1, "b", vec![])]);
1315 let mut m = Manifest::from_bytes(base.to_bytes().unwrap()).unwrap();
1316 m.append(&entry("c", vec![])).unwrap();
1317 m.append(&entry("d", vec![])).unwrap();
1318 m.append(&entry("e", vec![])).unwrap();
1319
1320 let bytes = m.to_bytes().unwrap();
1321
1322 let footer_start = bytes.len() - FOOTER_SIZE;
1323 let count = u32::from_le_bytes(bytes[footer_start..footer_start + 4].try_into().unwrap());
1324 let next_seq = u64::from_le_bytes(
1325 bytes[footer_start + 4..footer_start + 12]
1326 .try_into()
1327 .unwrap(),
1328 );
1329 let epoch = u64::from_le_bytes(
1330 bytes[footer_start + 12..footer_start + 20]
1331 .try_into()
1332 .unwrap(),
1333 );
1334 let version = u16::from_le_bytes(bytes[footer_start + 20..].try_into().unwrap());
1335 assert_eq!(count, 5);
1336 assert_eq!(next_seq, 5);
1337 assert_eq!(epoch, 0);
1338 assert_eq!(version, MANIFEST_VERSION);
1339 }
1340
1341 #[test]
1342 fn should_round_trip_from_entries_to_bytes_from_bytes() {
1343 let entries = vec![
1344 entry_seq(0, "a", vec![meta(0, 10, "m1")]),
1345 entry_seq(1, "b", vec![meta(0, 20, "m2")]),
1346 ];
1347 let original = Manifest::from_entries(&entries);
1348
1349 let reparsed = Manifest::from_bytes(original.to_bytes().unwrap()).unwrap();
1350
1351 assert_eq!(reparsed.entries_count(), 2);
1352 let decoded: Vec<QueueEntry> = reparsed.iter().map(|e| e.unwrap()).collect();
1353 assert_eq!(decoded[0].sequence, 0);
1354 assert_eq!(decoded[0].location, "a");
1355 assert_eq!(decoded[0].metadata, vec![meta(0, 10, "m1")]);
1356 assert_eq!(decoded[1].sequence, 1);
1357 assert_eq!(decoded[1].location, "b");
1358 assert_eq!(decoded[1].metadata, vec![meta(0, 20, "m2")]);
1359 }
1360
1361 #[test]
1362 fn should_round_trip_append_serialize_reparse() {
1363 let mut m = Manifest::empty();
1364 m.append(&entry("x", vec![meta(0, 100, "data")])).unwrap();
1365 m.append(&entry("y", vec![meta(0, 200, "more")])).unwrap();
1366
1367 let reparsed = Manifest::from_bytes(m.to_bytes().unwrap()).unwrap();
1368
1369 assert_eq!(reparsed.entries_count(), 2);
1370 assert_eq!(collect_locations(&reparsed), vec!["x", "y"]);
1371 }
1372
1373 #[test]
1374 fn should_chain_serialize_reparse_append() {
1375 let original = Manifest::from_entries(&[entry_seq(0, "a", vec![])]);
1376 let mut m = Manifest::from_bytes(original.to_bytes().unwrap()).unwrap();
1377 m.append(&entry("b", vec![])).unwrap();
1378
1379 let mut m2 = Manifest::from_bytes(m.to_bytes().unwrap()).unwrap();
1380 m2.append(&entry("c", vec![])).unwrap();
1381
1382 let final_m = Manifest::from_bytes(m2.to_bytes().unwrap()).unwrap();
1383
1384 assert_eq!(final_m.entries_count(), 3);
1385 assert_eq!(collect_locations(&final_m), vec!["a", "b", "c"]);
1386 let entries: Vec<QueueEntry> = final_m.iter().map(|e| e.unwrap()).collect();
1387 assert_eq!(entries[2].sequence, 2);
1388 }
1389
1390 #[test]
1391 fn should_dequeue_entries_through_sequence() {
1392 let mut m = Manifest::empty();
1393 for _ in 0..5 {
1394 m.append(&entry("loc", vec![])).unwrap();
1395 }
1396
1397 let removed = m.dequeue(2).unwrap();
1398
1399 assert_eq!(removed.len(), 3);
1400 assert_eq!(removed[0].sequence, 0);
1401 assert_eq!(removed[1].sequence, 1);
1402 assert_eq!(removed[2].sequence, 2);
1403 assert_eq!(m.entries_count(), 2);
1404 let remaining: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1405 assert_eq!(remaining[0].sequence, 3);
1406 assert_eq!(remaining[1].sequence, 4);
1407 assert_eq!(m.next_sequence, 5);
1408 }
1409
1410 #[test]
1411 fn should_dequeue_all_entries() {
1412 let mut m = Manifest::empty();
1413 for _ in 0..3 {
1414 m.append(&entry("loc", vec![])).unwrap();
1415 }
1416
1417 let removed = m.dequeue(2).unwrap();
1418
1419 assert_eq!(removed.len(), 3);
1420 assert!(m.is_empty());
1421 assert_eq!(m.next_sequence, 3);
1422 }
1423
1424 #[test]
1425 fn should_dequeue_nothing_when_sequence_below_first() {
1426 let entries = vec![
1427 entry_seq(5, "a", vec![]),
1428 entry_seq(6, "b", vec![]),
1429 entry_seq(7, "c", vec![]),
1430 ];
1431 let mut m = Manifest::from_entries(&entries);
1432
1433 let removed = m.dequeue(3).unwrap();
1434
1435 assert!(removed.is_empty());
1436 assert_eq!(m.entries_count(), 3);
1437 }
1438
1439 #[test]
1440 fn should_append_after_dequeue() {
1441 let mut m = Manifest::empty();
1442 for _ in 0..3 {
1443 m.append(&entry("loc", vec![])).unwrap();
1444 }
1445
1446 m.dequeue(0).unwrap();
1447
1448 assert_eq!(m.entries_count(), 2);
1449 let remaining: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1450 assert_eq!(remaining[0].sequence, 1);
1451 assert_eq!(remaining[1].sequence, 2);
1452
1453 m.append(&entry("new", vec![])).unwrap();
1454 let all: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1455 assert_eq!(all.len(), 3);
1456 assert_eq!(all[2].sequence, 3);
1457 }
1458
1459 fn encode_entry_bytes(entry: &QueueEntry) -> Vec<u8> {
1461 let mut buf = BytesMut::new();
1462 Manifest::encode_entry(&mut buf, entry).unwrap();
1463 buf.to_vec()
1464 }
1465
1466 fn manifest_from_raw_entry(entry_bytes: &[u8]) -> Manifest {
1469 let mut buf = BytesMut::new();
1470 buf.extend_from_slice(entry_bytes);
1471 buf.put_u32_le(1); buf.put_u64_le(1); buf.put_u64_le(0); buf.put_u16_le(MANIFEST_VERSION);
1475 Manifest::from_bytes(buf.freeze()).unwrap()
1476 }
1477
1478 const METADATA_COUNT_OFFSET: usize = ENTRY_LEN_SIZE + SEQUENCE_SIZE + LOCATION_LEN_SIZE + 1;
1481
1482 fn corrupt_metadata_entry(count: u32, extra_bytes: &[u8]) -> Vec<u8> {
1486 let e = QueueEntry {
1487 sequence: 1,
1488 location: "a".to_string(),
1489 metadata: vec![],
1490 };
1491 let mut raw = encode_entry_bytes(&e);
1492 raw[METADATA_COUNT_OFFSET..METADATA_COUNT_OFFSET + 4].copy_from_slice(&count.to_le_bytes());
1494 raw.extend_from_slice(extra_bytes);
1496 let new_entry_len = (raw.len() - ENTRY_LEN_SIZE) as u32;
1498 raw[..ENTRY_LEN_SIZE].copy_from_slice(&new_entry_len.to_le_bytes());
1499 raw
1500 }
1501
1502 #[test]
1503 fn should_reject_trailing_bytes_before_footer() {
1504 let mut raw = encode_entry_bytes(&entry_seq(0, "loc", vec![]));
1506 raw.extend_from_slice(&[0xFFu8; 5]); let manifest = manifest_from_raw_entry(&raw);
1509 let items: Vec<Result<QueueEntry>> = manifest.iter().collect();
1510 assert_eq!(items.len(), 2);
1511 assert!(items[0].is_ok());
1512 let err = items[1].as_ref().unwrap_err();
1513 assert!(
1514 err.to_string().contains("did not consume all bytes"),
1515 "got: {}",
1516 err
1517 );
1518 }
1519
1520 #[test]
1521 fn should_reject_entry_with_entry_len_below_minimum() {
1522 let bad_entry_len = (SEQUENCE_SIZE + LOCATION_LEN_SIZE + METADATA_COUNT_SIZE - 1) as u32;
1524 let mut raw = Vec::new();
1525 raw.extend_from_slice(&bad_entry_len.to_le_bytes());
1526 raw.extend_from_slice(&[0u8; 13]); let manifest = manifest_from_raw_entry(&raw);
1529 let err = manifest.iter().next().unwrap().unwrap_err();
1530 assert!(err.to_string().contains(
1531 "entry length 13 is less than minimum entry length 14 for the length of the location 0"
1532 ));
1533 }
1534
1535 #[test]
1536 fn should_reject_entry_with_entry_len_below_minimum_for_location() {
1537 let location = "abc";
1538 let bad_entry_len =
1540 (SEQUENCE_SIZE + LOCATION_LEN_SIZE + METADATA_COUNT_SIZE + location.len() - 1) as u32;
1541 let mut raw = Vec::new();
1542 raw.extend_from_slice(&bad_entry_len.to_le_bytes());
1543 raw.extend_from_slice(&0u64.to_le_bytes()); raw.extend_from_slice(&(location.len() as u16).to_le_bytes()); raw.extend_from_slice(&[0u8; 20]); let manifest = manifest_from_raw_entry(&raw);
1548 let err = manifest.iter().next().unwrap().unwrap_err();
1549 assert!(
1550 err.to_string()
1551 .contains("entry length 16 is less than minimum entry length 17"),
1552 "got: {}",
1553 err
1554 );
1555 }
1556
1557 #[test]
1558 fn should_reject_entry_truncated_before_entry_len() {
1559 let manifest = manifest_from_raw_entry(&[0u8; 2]);
1561 let err = manifest.iter().next().unwrap().unwrap_err();
1562 assert!(
1563 matches!(&err, Error::Serialization(msg) if msg.contains("entry length field does not fit")),
1564 "unexpected error: {err}"
1565 );
1566 }
1567
1568 #[test]
1569 fn should_reject_entry_truncated_before_metadata_start_index() {
1570 let manifest = manifest_from_raw_entry(&corrupt_metadata_entry(1, &[]));
1572 let err = manifest.iter().next().unwrap().unwrap_err();
1573 assert!(
1574 matches!(&err, Error::Serialization(msg) if msg.contains("start index field does not fit")),
1575 "unexpected error: {err}"
1576 );
1577 }
1578
1579 #[test]
1580 fn should_reject_entry_truncated_before_metadata_ingestion_time() {
1581 let manifest = manifest_from_raw_entry(&corrupt_metadata_entry(1, &0u32.to_le_bytes()));
1583 let err = manifest.iter().next().unwrap().unwrap_err();
1584 assert!(
1585 matches!(&err, Error::Serialization(msg) if msg.contains("ingestion time field does not fit")),
1586 "unexpected error: {err}"
1587 );
1588 }
1589
1590 #[test]
1591 fn should_reject_entry_truncated_before_metadata_length() {
1592 let mut extra = Vec::new();
1594 extra.extend_from_slice(&0u32.to_le_bytes()); extra.extend_from_slice(&0i64.to_le_bytes()); let manifest = manifest_from_raw_entry(&corrupt_metadata_entry(1, &extra));
1597 let err = manifest.iter().next().unwrap().unwrap_err();
1598 assert!(
1599 matches!(&err, Error::Serialization(msg) if msg.contains("metadata length field does not fit")),
1600 "unexpected error: {err}"
1601 );
1602 }
1603
1604 #[test]
1605 fn should_reject_entry_truncated_before_metadata_payload() {
1606 let mut extra = Vec::new();
1608 extra.extend_from_slice(&0u32.to_le_bytes()); extra.extend_from_slice(&0i64.to_le_bytes()); extra.extend_from_slice(&10u32.to_le_bytes()); extra.extend_from_slice(&[0xAB, 0xCD]); let manifest = manifest_from_raw_entry(&corrupt_metadata_entry(1, &extra));
1613 let err = manifest.iter().next().unwrap().unwrap_err();
1614 assert!(
1615 matches!(&err, Error::Serialization(msg) if msg.contains("metadata has less bytes than set")),
1616 "unexpected error: {err}"
1617 );
1618 }
1619
1620 #[tokio::test]
1621 async fn should_reject_location_exceeding_u16_max() {
1622 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1623 let producer =
1624 QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1625
1626 let long_location = "x".repeat(u16::MAX as usize + 1);
1627 let result = producer.enqueue(long_location, vec![]).await;
1628 assert!(matches!(result, Err(Error::InvalidInput(msg)) if msg.contains("location length")));
1629 }
1630}