1use std::sync::Arc;
2use std::sync::atomic::{AtomicU64, Ordering};
3
4use bytes::{BufMut, Bytes, BytesMut};
5use slatedb::object_store::path::Path;
6use slatedb::object_store::{
7 Error as ObjectStoreError, ObjectStore, PutMode, PutPayload, UpdateVersion,
8};
9
10use crate::error::{Error, Result};
11
12const MANIFEST_VERSION: u16 = 1;
13const UNINITIALIZED_EPOCH: u64 = u64::MAX;
14const ENTRY_LEN_SIZE: usize = 4;
15const LOCATION_LEN_SIZE: usize = 2;
16const INGESTION_TIME_MS_SIZE: usize = 8;
17const METADATA_LEN_SIZE: usize = 4;
18const START_INDEX_SIZE: usize = 4;
19const METADATA_COUNT_SIZE: usize = 4;
20const ENTRIES_COUNT_SIZE: usize = 4;
21const SEQUENCE_SIZE: usize = 8;
22const EPOCH_SIZE: usize = 8;
23const VERSION_SIZE: usize = 2;
24const FOOTER_SIZE: usize = ENTRIES_COUNT_SIZE + SEQUENCE_SIZE + EPOCH_SIZE + VERSION_SIZE;
25
26#[derive(Debug, Clone, PartialEq)]
28pub struct Metadata {
29 pub start_index: u32,
31 pub ingestion_time_ms: i64,
33 pub payload: Bytes,
35}
36
37#[derive(Debug, Clone)]
38pub(crate) struct QueueEntry {
39 pub(crate) sequence: u64,
40 pub(crate) location: String,
41 pub(crate) metadata: Vec<Metadata>,
42}
43
44impl QueueEntry {
45 fn new(location: String, metadata: Vec<Metadata>) -> Result<Self> {
46 if location.len() > u16::MAX as usize {
47 return Err(Error::InvalidInput(format!(
48 "location length {} exceeds u16::MAX",
49 location.len()
50 )));
51 }
52 if metadata.len() > u32::MAX as usize {
53 return Err(Error::InvalidInput(format!(
54 "metadata count {} exceeds u32::MAX",
55 metadata.len()
56 )));
57 }
58 Ok(Self {
59 sequence: 0,
60 location,
61 metadata,
62 })
63 }
64
65 fn clone_with_sequence(&self, sequence: u64) -> Self {
66 Self {
67 sequence,
68 ..self.clone()
69 }
70 }
71}
72
73#[derive(Debug, Clone)]
74pub(crate) struct Manifest {
75 data: Bytes,
76 appended: BytesMut,
77 appended_count: usize,
78 next_sequence: u64,
79 epoch: u64,
80}
81
82impl Manifest {
83 fn empty() -> Self {
85 let mut buf = BytesMut::with_capacity(FOOTER_SIZE);
86 buf.put_u32_le(0);
87 buf.put_u64_le(0);
88 buf.put_u64_le(0);
89 buf.put_u16_le(MANIFEST_VERSION);
90 Self {
91 data: buf.freeze(),
92 appended: BytesMut::new(),
93 appended_count: 0,
94 next_sequence: 0,
95 epoch: 0,
96 }
97 }
98
99 pub(crate) fn from_bytes(data: Bytes) -> Result<Self> {
101 if data.is_empty() {
102 return Err(Error::Serialization(
103 "queue manifest data must not be empty".to_string(),
104 ));
105 }
106 if data.len() < FOOTER_SIZE {
107 return Err(Error::Serialization(
108 "queue manifest too short for footer".to_string(),
109 ));
110 }
111 let version_start = data.len() - VERSION_SIZE;
112 let version = u16::from_le_bytes(data[version_start..].try_into().unwrap());
113 if version != MANIFEST_VERSION {
114 return Err(Error::Serialization(format!(
115 "unsupported queue manifest version: {}",
116 version
117 )));
118 }
119 let epoch_start = data.len() - VERSION_SIZE - EPOCH_SIZE;
120 let epoch = u64::from_le_bytes(
121 data[epoch_start..epoch_start + EPOCH_SIZE]
122 .try_into()
123 .unwrap(),
124 );
125 let next_seq_start = data.len() - VERSION_SIZE - EPOCH_SIZE - SEQUENCE_SIZE;
126 let next_sequence = u64::from_le_bytes(
127 data[next_seq_start..next_seq_start + SEQUENCE_SIZE]
128 .try_into()
129 .unwrap(),
130 );
131 Ok(Self {
132 data,
133 appended: BytesMut::new(),
134 appended_count: 0,
135 next_sequence,
136 epoch,
137 })
138 }
139
140 #[cfg(test)]
142 fn from_entries(entries: &[QueueEntry]) -> Self {
143 let next_sequence = entries.iter().map(|e| e.sequence + 1).max().unwrap_or(0);
144 let mut buf = BytesMut::new();
145 for entry in entries {
146 Self::encode_entry(&mut buf, entry).unwrap();
147 }
148 buf.put_u32_le(entries.len() as u32);
149 buf.put_u64_le(next_sequence);
150 buf.put_u64_le(0);
151 buf.put_u16_le(MANIFEST_VERSION);
152 Self {
153 data: buf.freeze(),
154 appended: BytesMut::new(),
155 appended_count: 0,
156 next_sequence,
157 epoch: 0,
158 }
159 }
160
161 fn entries_count(&self) -> usize {
163 let base = self.existing_entries_count();
164 base + self.appended_count
165 }
166
167 #[cfg(test)]
169 fn is_empty(&self) -> bool {
170 self.entries_count() == 0
171 }
172
173 pub(crate) fn iter(&self) -> ManifestIter<'_> {
175 let base_count = self.existing_entries_count();
176 let entries_end = if self.data.is_empty() {
177 0
178 } else {
179 self.data.len() - FOOTER_SIZE
180 };
181 ManifestIter {
182 data: &self.data,
183 offset: 0,
184 remaining: base_count,
185 entries_end,
186 appended: &self.appended,
187 appended_offset: 0,
188 appended_remaining: self.appended_count,
189 }
190 }
191
192 fn existing_entries_count(&self) -> usize {
193 if self.data.is_empty() {
194 0
195 } else {
196 let footer_start = self.data.len() - FOOTER_SIZE;
197 u32::from_le_bytes(
198 self.data[footer_start..footer_start + ENTRIES_COUNT_SIZE]
199 .try_into()
200 .unwrap(),
201 ) as usize
202 }
203 }
204
205 fn append(&mut self, entry: &QueueEntry) -> Result<()> {
209 let sequenced = entry.clone_with_sequence(self.next_sequence);
210 Self::encode_entry(&mut self.appended, &sequenced)?;
211 self.next_sequence += 1;
212 self.appended_count += 1;
213 Ok(())
214 }
215
216 fn dequeue(&mut self, through_sequence: u64) -> Result<Vec<QueueEntry>> {
221 let next_seq = self.next_sequence;
222 let epoch = self.epoch;
223
224 let base_count = self.existing_entries_count();
225 let entries_end = if self.data.is_empty() {
226 0
227 } else {
228 self.data.len() - FOOTER_SIZE
229 };
230
231 let (mut removed, remaining_base_start, remaining_base_count) =
232 split_entries(&self.data, base_count, entries_end, through_sequence)?;
233
234 let appended_end = self.appended.len();
235 let (appended_removed, remaining_appended_start, remaining_appended_count) = split_entries(
236 &self.appended,
237 self.appended_count,
238 appended_end,
239 through_sequence,
240 )?;
241 removed.extend(appended_removed);
242
243 let remaining_base_bytes = &self.data[remaining_base_start..entries_end];
244 let remaining_appended_bytes = &self.appended[remaining_appended_start..appended_end];
245 let total_remaining = remaining_base_count + remaining_appended_count;
246
247 let mut buf = BytesMut::with_capacity(
248 remaining_base_bytes.len() + remaining_appended_bytes.len() + FOOTER_SIZE,
249 );
250 buf.extend_from_slice(remaining_base_bytes);
251 buf.extend_from_slice(remaining_appended_bytes);
252 buf.put_u32_le(total_remaining);
253 buf.put_u64_le(next_seq);
254 buf.put_u64_le(epoch);
255 buf.put_u16_le(MANIFEST_VERSION);
256
257 self.data = buf.freeze();
258 self.appended = BytesMut::new();
259 self.appended_count = 0;
260 self.next_sequence = next_seq;
261 self.epoch = epoch;
262
263 Ok(removed)
264 }
265
266 fn set_epoch(&mut self, epoch: u64) {
268 self.epoch = epoch;
269 let mut buf = BytesMut::from(self.data.as_ref());
270 let epoch_start = buf.len() - VERSION_SIZE - EPOCH_SIZE;
271 buf[epoch_start..epoch_start + EPOCH_SIZE].copy_from_slice(&epoch.to_le_bytes());
272 self.data = buf.freeze();
273 }
274
275 fn to_bytes(&self) -> Result<Bytes> {
279 if self.appended.is_empty() {
280 return Ok(self.data.clone());
281 }
282 let (prefix, base_count) = if self.data.is_empty() {
283 (&[] as &[u8], 0u32)
284 } else {
285 let footer_start = self.data.len() - FOOTER_SIZE;
286 let count = u32::from_le_bytes(
287 self.data[footer_start..footer_start + ENTRIES_COUNT_SIZE]
288 .try_into()
289 .unwrap(),
290 );
291 (&self.data[..footer_start], count)
292 };
293 let total_count: u32 = base_count
294 .checked_add(self.appended_count as u32)
295 .ok_or_else(|| {
296 Error::Serialization(format!(
297 "total entry count consisting of {} existing entries + {} appended entries exceeds u32::MAX",
298 base_count, self.appended_count
299 ))
300 })?;
301 let mut buf = BytesMut::with_capacity(prefix.len() + self.appended.len() + FOOTER_SIZE);
302 buf.extend_from_slice(prefix);
303 buf.extend_from_slice(&self.appended);
304 buf.put_u32_le(total_count);
305 buf.put_u64_le(self.next_sequence);
306 buf.put_u64_le(self.epoch);
307 buf.put_u16_le(MANIFEST_VERSION);
308 Ok(buf.freeze())
309 }
310
311 fn encode_entry(buf: &mut BytesMut, entry: &QueueEntry) -> Result<()> {
312 debug_assert!(entry.location.len() <= u16::MAX as usize);
313 let metadata_size: usize = METADATA_COUNT_SIZE
314 + entry
315 .metadata
316 .iter()
317 .map(|m| {
318 START_INDEX_SIZE + INGESTION_TIME_MS_SIZE + METADATA_LEN_SIZE + m.payload.len()
319 })
320 .sum::<usize>();
321 let entry_body_len =
322 SEQUENCE_SIZE + LOCATION_LEN_SIZE + entry.location.len() + metadata_size;
323 debug_assert!(entry_body_len <= u32::MAX as usize);
324 buf.put_u32_le(entry_body_len as u32);
325 buf.put_u64_le(entry.sequence);
326 buf.put_u16_le(entry.location.len() as u16);
327 buf.extend_from_slice(entry.location.as_bytes());
328 debug_assert!(entry.metadata.len() <= u32::MAX as usize);
329 buf.put_u32_le(entry.metadata.len() as u32);
330 for m in &entry.metadata {
331 if m.payload.len() > u32::MAX as usize {
332 return Err(Error::InvalidInput(format!(
333 "metadata payload size {} exceeds u32::MAX",
334 m.payload.len()
335 )));
336 }
337 buf.put_u32_le(m.start_index);
338 buf.put_i64_le(m.ingestion_time_ms);
339 buf.put_u32_le(m.payload.len() as u32);
340 buf.extend_from_slice(&m.payload);
341 }
342 Ok(())
343 }
344}
345
346fn split_entries(
350 data: &[u8],
351 count: usize,
352 end: usize,
353 through_sequence: u64,
354) -> Result<(Vec<QueueEntry>, usize, u32)> {
355 let mut removed = Vec::new();
356 let mut offset = 0usize;
357
358 for i in 0..count {
359 let entry_start = offset;
360 let entry = decode_entry(data, &mut offset, end)?;
361
362 if entry.sequence <= through_sequence {
363 removed.push(entry);
364 } else {
365 return Ok((removed, entry_start, (count - i) as u32));
366 }
367 }
368
369 Ok((removed, end, 0))
370}
371
372fn decode_entry(data: &[u8], offset: &mut usize, end: usize) -> Result<QueueEntry> {
374 if *offset + ENTRY_LEN_SIZE > end {
375 return Err(Error::Serialization(
376 "queue entry corrupt: size of entry length field does not fit in entry".to_string(),
377 ));
378 }
379
380 let entry_len =
381 u32::from_le_bytes(data[*offset..*offset + ENTRY_LEN_SIZE].try_into().unwrap()) as usize;
382 *offset += ENTRY_LEN_SIZE;
383
384 if *offset + entry_len > end {
385 return Err(Error::Serialization(
386 "queue entry corrupt: entry has less bytes than set in the entry length".to_string(),
387 ));
388 }
389
390 let entry_end = *offset + entry_len;
391
392 let sequence = u64::from_le_bytes(data[*offset..*offset + SEQUENCE_SIZE].try_into().unwrap());
393 *offset += SEQUENCE_SIZE;
394
395 let location_len = u16::from_le_bytes(
396 data[*offset..*offset + LOCATION_LEN_SIZE]
397 .try_into()
398 .unwrap(),
399 ) as usize;
400 *offset += LOCATION_LEN_SIZE;
401
402 let min_entry_len = SEQUENCE_SIZE + LOCATION_LEN_SIZE + location_len + METADATA_COUNT_SIZE;
403 if entry_len < min_entry_len {
404 return Err(Error::Serialization(format!(
405 "queue entry corrupt: entry length {} is less than minimum entry length {} for the length of the location {}",
406 entry_len, min_entry_len, location_len
407 )));
408 }
409
410 let location = String::from_utf8(data[*offset..*offset + location_len].to_vec())
411 .map_err(|e| Error::Serialization(e.to_string()))?;
412 *offset += location_len;
413
414 let metadata_count = u32::from_le_bytes(
415 data[*offset..*offset + METADATA_COUNT_SIZE]
416 .try_into()
417 .unwrap(),
418 ) as usize;
419 *offset += METADATA_COUNT_SIZE;
420
421 let mut metadata = Vec::with_capacity(metadata_count);
422 for _ in 0..metadata_count {
423 if *offset + START_INDEX_SIZE > end {
424 return Err(Error::Serialization(
425 "queue entry corrupt: size of start index field does not fit in entry".to_string(),
426 ));
427 }
428 let start_index = u32::from_le_bytes(
429 data[*offset..*offset + START_INDEX_SIZE]
430 .try_into()
431 .unwrap(),
432 );
433 *offset += START_INDEX_SIZE;
434
435 if *offset + INGESTION_TIME_MS_SIZE > end {
436 return Err(Error::Serialization(
437 "queue entry corrupt: size of ingestion time field does not fit in entry"
438 .to_string(),
439 ));
440 }
441 let ingestion_time_ms = i64::from_le_bytes(
442 data[*offset..*offset + INGESTION_TIME_MS_SIZE]
443 .try_into()
444 .unwrap(),
445 );
446 *offset += INGESTION_TIME_MS_SIZE;
447
448 if *offset + METADATA_LEN_SIZE > end {
449 return Err(Error::Serialization(
450 "queue entry corrupt: size of metadata length field does not fit in entry"
451 .to_string(),
452 ));
453 }
454 let m_len = u32::from_le_bytes(
455 data[*offset..*offset + METADATA_LEN_SIZE]
456 .try_into()
457 .unwrap(),
458 ) as usize;
459 *offset += METADATA_LEN_SIZE;
460
461 if *offset + m_len > end {
462 return Err(Error::Serialization(
463 "queue entry corrupt: metadata has less bytes than set in the metadata length"
464 .to_string(),
465 ));
466 }
467 metadata.push(Metadata {
468 start_index,
469 ingestion_time_ms,
470 payload: Bytes::copy_from_slice(&data[*offset..*offset + m_len]),
471 });
472 *offset += m_len;
473 }
474
475 *offset = entry_end;
476
477 Ok(QueueEntry {
478 sequence,
479 location,
480 metadata,
481 })
482}
483
484pub(crate) struct ManifestIter<'a> {
486 data: &'a [u8],
487 offset: usize,
488 remaining: usize,
489 entries_end: usize,
490 appended: &'a [u8],
491 appended_offset: usize,
492 appended_remaining: usize,
493}
494
495impl Iterator for ManifestIter<'_> {
496 type Item = Result<QueueEntry>;
497
498 fn next(&mut self) -> Option<Self::Item> {
499 if self.remaining > 0 {
500 self.remaining -= 1;
501 Some(decode_entry(self.data, &mut self.offset, self.entries_end))
502 } else if self.appended_remaining > 0 {
503 self.appended_remaining -= 1;
504 Some(decode_entry(
505 self.appended,
506 &mut self.appended_offset,
507 self.appended.len(),
508 ))
509 } else if self.offset != self.entries_end {
510 let err = Some(Err(Error::Serialization(format!(
511 "base entries did not consume all bytes: offset {} != entries_end {}",
512 self.offset, self.entries_end
513 ))));
514 self.offset = self.entries_end;
515 err
516 } else {
517 None
518 }
519 }
520}
521
522enum ManifestWriteError {
523 Conflict,
524 Fatal(Error),
525}
526
527#[derive(Clone)]
528pub(crate) struct ManifestStore {
529 pub(crate) object_store: Arc<dyn ObjectStore>,
530 pub(crate) manifest_path: String,
531}
532
533impl ManifestStore {
534 pub(crate) async fn read(&self) -> Result<(Manifest, Option<UpdateVersion>)> {
535 let path = Path::from(self.manifest_path.as_str());
536 match self.object_store.get(&path).await {
537 Ok(result) => {
538 let version = UpdateVersion {
539 e_tag: result.meta.e_tag.clone(),
540 version: result.meta.version.clone(),
541 };
542 let bytes = result
543 .bytes()
544 .await
545 .map_err(|e| Error::Storage(e.to_string()))?;
546 let manifest = Manifest::from_bytes(bytes)?;
547 Ok((manifest, Some(version)))
548 }
549 Err(ObjectStoreError::NotFound { .. }) => Ok((Manifest::empty(), None)),
550 Err(e) => Err(Error::Storage(e.to_string())),
551 }
552 }
553
554 async fn write(
555 &self,
556 manifest: &Manifest,
557 version: Option<UpdateVersion>,
558 ) -> std::result::Result<(), ManifestWriteError> {
559 let path = Path::from(self.manifest_path.as_str());
560 let put_mode = match version {
561 Some(v) => PutMode::Update(v),
562 None => PutMode::Create,
563 };
564 let data = manifest.to_bytes().map_err(ManifestWriteError::Fatal)?;
565
566 match self
567 .object_store
568 .put_opts(&path, PutPayload::from(data.to_vec()), put_mode.into())
569 .await
570 {
571 Ok(_) => Ok(()),
572 Err(ObjectStoreError::Precondition { .. })
573 | Err(ObjectStoreError::AlreadyExists { .. }) => Err(ManifestWriteError::Conflict),
574 Err(e) => Err(ManifestWriteError::Fatal(Error::Storage(e.to_string()))),
575 }
576 }
577}
578
579struct ConflictCounter {
580 write_count: AtomicU64,
581 conflict_count: AtomicU64,
582 role: &'static str,
583}
584
585impl ConflictCounter {
586 fn new(role: &'static str) -> Self {
587 Self {
588 write_count: AtomicU64::new(0),
589 conflict_count: AtomicU64::new(0),
590 role,
591 }
592 }
593
594 fn record_write(&self) {
595 self.write_count.fetch_add(1, Ordering::Relaxed);
596 metrics::counter!(crate::metric_names::MANIFEST_WRITES, "role" => self.role).increment(1);
597 }
598
599 fn record_conflict(&self) {
600 self.conflict_count.fetch_add(1, Ordering::Relaxed);
601 metrics::counter!(crate::metric_names::MANIFEST_CONFLICTS, "role" => self.role)
602 .increment(1);
603 }
604
605 fn conflict_rate(&self) -> f64 {
606 let writes = self.write_count.load(Ordering::Relaxed);
607 if writes == 0 {
608 return 0.0;
609 }
610 let conflicts = self.conflict_count.load(Ordering::Relaxed);
611 let rate = (conflicts as f64 / writes as f64) * 100.0;
612 rate.min(100.0)
613 }
614}
615
616pub struct QueueProducer {
622 manifest_store: ManifestStore,
623 counter: ConflictCounter,
624}
625
626impl QueueProducer {
627 pub fn with_object_store(manifest_path: String, object_store: Arc<dyn ObjectStore>) -> Self {
629 Self {
630 manifest_store: ManifestStore {
631 object_store,
632 manifest_path,
633 },
634 counter: ConflictCounter::new("producer"),
635 }
636 }
637
638 pub async fn enqueue(&self, location: String, metadata: Vec<Metadata>) -> Result<()> {
644 let entry = QueueEntry::new(location, metadata)?;
645 loop {
646 let (mut manifest, version) = self.manifest_store.read().await?;
647 manifest.append(&entry)?;
648 self.counter.record_write();
649 match self.manifest_store.write(&manifest, version).await {
650 Ok(()) => return Ok(()),
651 Err(ManifestWriteError::Conflict) => {
652 self.counter.record_conflict();
653 continue;
654 }
655 Err(ManifestWriteError::Fatal(e)) => return Err(e),
656 }
657 }
658 }
659
660 pub fn conflict_rate(&self) -> f64 {
662 self.counter.conflict_rate()
663 }
664}
665
666pub struct QueueConsumer {
675 manifest_store: ManifestStore,
676 epoch: AtomicU64,
677 counter: ConflictCounter,
678 queue_len: AtomicU64,
679}
680
681impl QueueConsumer {
682 pub fn with_object_store(manifest_path: String, object_store: Arc<dyn ObjectStore>) -> Self {
686 Self {
687 manifest_store: ManifestStore {
688 object_store,
689 manifest_path,
690 },
691 epoch: AtomicU64::new(UNINITIALIZED_EPOCH),
692 counter: ConflictCounter::new("consumer"),
693 queue_len: AtomicU64::new(0),
694 }
695 }
696
697 pub async fn initialize(&self) -> Result<()> {
700 loop {
701 let (mut manifest, version) = self.read_manifest().await?;
702 let mut new_epoch = manifest.epoch.wrapping_add(1);
703 if new_epoch == UNINITIALIZED_EPOCH {
704 new_epoch = new_epoch.wrapping_add(1);
705 }
706 manifest.set_epoch(new_epoch);
707 match self.write_manifest(&manifest, version).await {
708 Ok(()) => {
709 self.epoch.store(new_epoch, Ordering::Relaxed);
710 return Ok(());
711 }
712 Err(ManifestWriteError::Conflict) => {
713 self.counter.record_conflict();
714 continue;
715 }
716 Err(ManifestWriteError::Fatal(e)) => return Err(e),
717 }
718 }
719 }
720
721 #[allow(dead_code)]
728 pub(crate) async fn peek(&self) -> Result<Option<QueueEntry>> {
729 let (manifest, _) = self.read_manifest().await?;
730 if manifest.epoch != self.epoch.load(Ordering::Relaxed) {
731 return Err(Error::Fenced);
732 }
733 manifest.iter().next().transpose()
734 }
735
736 pub(crate) async fn descriptors_after(
745 &self,
746 after_sequence: Option<u64>,
747 max: usize,
748 ) -> Result<Vec<QueueEntry>> {
749 if max == 0 {
750 return Ok(Vec::new());
751 }
752 let (manifest, _) = self.read_manifest().await?;
753 if manifest.epoch != self.epoch.load(Ordering::Relaxed) {
754 return Err(Error::Fenced);
755 }
756 let mut out = Vec::with_capacity(max);
757 for entry in manifest.iter() {
758 let entry = entry?;
759 if let Some(after) = after_sequence
760 && entry.sequence <= after
761 {
762 continue;
763 }
764 out.push(entry);
765 if out.len() >= max {
766 break;
767 }
768 }
769 Ok(out)
770 }
771
772 #[allow(dead_code)]
778 pub(crate) async fn read(&self, sequence: u64) -> Result<Option<QueueEntry>> {
779 let (manifest, _) = self.read_manifest().await?;
780 if manifest.epoch != self.epoch.load(Ordering::Relaxed) {
781 return Err(Error::Fenced);
782 }
783 manifest
784 .iter()
785 .find(|e| matches!(e, Ok(e) if e.sequence == sequence))
786 .transpose()
787 }
788
789 pub(crate) async fn dequeue(&self, through_sequence: u64) -> Result<Vec<QueueEntry>> {
792 loop {
793 let (mut manifest, version) = self.read_manifest().await?;
794 if manifest.epoch != self.epoch.load(Ordering::Relaxed) {
795 return Err(Error::Fenced);
796 }
797 let removed = manifest.dequeue(through_sequence)?;
798 match self.write_manifest(&manifest, version).await {
799 Ok(()) => return Ok(removed),
800 Err(ManifestWriteError::Conflict) => {
801 self.counter.record_conflict();
802 continue;
803 }
804 Err(ManifestWriteError::Fatal(e)) => return Err(e),
805 }
806 }
807 }
808
809 pub fn len(&self) -> usize {
811 self.queue_len.load(Ordering::Relaxed) as usize
812 }
813
814 async fn read_manifest(&self) -> Result<(Manifest, Option<UpdateVersion>)> {
815 let result = self.manifest_store.read().await?;
816 self.queue_len
817 .store(result.0.entries_count() as u64, Ordering::Relaxed);
818 Ok(result)
819 }
820
821 async fn write_manifest(
822 &self,
823 manifest: &Manifest,
824 version: Option<UpdateVersion>,
825 ) -> std::result::Result<(), ManifestWriteError> {
826 self.counter.record_write();
827 let result = self.manifest_store.write(manifest, version).await;
828 if result.is_ok() {
829 self.queue_len
830 .store(manifest.entries_count() as u64, Ordering::Relaxed);
831 }
832 result
833 }
834
835 pub fn conflict_rate(&self) -> f64 {
837 self.counter.conflict_rate()
838 }
839}
840
841#[derive(Debug, Clone, PartialEq)]
843pub struct ManifestEntry {
844 pub sequence: u64,
845 pub location: String,
846 pub metadata: Vec<Metadata>,
847}
848
849#[derive(Debug, Clone)]
851pub struct ManifestView {
852 pub epoch: u64,
853 pub next_sequence: u64,
854 entries: Vec<ManifestEntry>,
855}
856
857impl ManifestView {
858 pub fn entries(&self) -> &[ManifestEntry] {
860 &self.entries
861 }
862}
863
864pub fn parse_manifest(data: Bytes) -> Result<ManifestView> {
866 let manifest = Manifest::from_bytes(data)?;
867 let entries = manifest
868 .iter()
869 .map(|r| {
870 r.map(|e| ManifestEntry {
871 sequence: e.sequence,
872 location: e.location,
873 metadata: e.metadata,
874 })
875 })
876 .collect::<Result<Vec<_>>>()?;
877 Ok(ManifestView {
878 epoch: manifest.epoch,
879 next_sequence: manifest.next_sequence,
880 entries,
881 })
882}
883
884#[cfg(test)]
885mod tests {
886 use super::*;
887 use slatedb::object_store::memory::InMemory;
888
889 const TEST_MANIFEST_PATH: &str = "test/manifest";
890
891 async fn read_producer_manifest(store: &Arc<dyn ObjectStore>, path: &str) -> Manifest {
892 let path = Path::from(path);
893 let result = store.get(&path).await.unwrap();
894 let bytes = result.bytes().await.unwrap();
895 Manifest::from_bytes(bytes).unwrap()
896 }
897
898 #[tokio::test]
899 async fn should_initialize_consumer_and_increment_epoch() {
900 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
901 let consumer =
902 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
903
904 consumer.initialize().await.unwrap();
905
906 let manifest = read_producer_manifest(&store, TEST_MANIFEST_PATH).await;
907 assert_eq!(manifest.epoch, 1);
908 }
909
910 #[tokio::test]
911 async fn should_peek_none_when_queue_is_empty() {
912 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
913 let consumer =
914 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
915 consumer.initialize().await.unwrap();
916
917 let result = consumer.peek().await.unwrap();
918 assert!(result.is_none());
919 }
920
921 #[tokio::test]
922 async fn should_read_entry_by_sequence() {
923 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
924 let producer =
925 QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
926
927 producer
928 .enqueue("a.batch".to_string(), vec![])
929 .await
930 .unwrap();
931 producer
932 .enqueue("b.batch".to_string(), vec![])
933 .await
934 .unwrap();
935 producer
936 .enqueue("c.batch".to_string(), vec![])
937 .await
938 .unwrap();
939
940 let consumer =
941 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
942 consumer.initialize().await.unwrap();
943
944 let entry = consumer.read(1).await.unwrap().unwrap();
945 assert_eq!(entry.location, "b.batch");
946 assert_eq!(entry.sequence, 1);
947 }
948
949 #[tokio::test]
950 async fn should_read_none_for_missing_sequence() {
951 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
952 let producer =
953 QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
954
955 producer
956 .enqueue("a.batch".to_string(), vec![])
957 .await
958 .unwrap();
959
960 let consumer =
961 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
962 consumer.initialize().await.unwrap();
963
964 let result = consumer.read(99).await.unwrap();
965 assert!(result.is_none());
966 }
967
968 #[tokio::test]
969 async fn should_fence_old_consumer_on_peek() {
970 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
971 let consumer_a =
972 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
973 consumer_a.initialize().await.unwrap();
974
975 let consumer_b =
976 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
977 consumer_b.initialize().await.unwrap();
978
979 let result = consumer_a.peek().await;
980 assert!(matches!(result, Err(Error::Fenced)));
981 }
982
983 #[tokio::test]
984 async fn should_fence_old_consumer_on_dequeue() {
985 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
986 let consumer_a =
987 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
988 consumer_a.initialize().await.unwrap();
989
990 let consumer_b =
991 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
992 consumer_b.initialize().await.unwrap();
993
994 let result = consumer_a.dequeue(0).await;
995 assert!(matches!(result, Err(Error::Fenced)));
996 }
997
998 #[tokio::test]
999 async fn should_fence_uninitialized_consumer() {
1000 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1001 let producer =
1002 QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1003
1004 producer
1005 .enqueue("a.batch".to_string(), vec![])
1006 .await
1007 .unwrap();
1008
1009 let consumer =
1010 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1011
1012 let result = consumer.peek().await;
1013 assert!(matches!(result, Err(Error::Fenced)));
1014 }
1015
1016 #[tokio::test]
1017 async fn should_wrap_epoch_to_zero_at_max() {
1018 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1019
1020 let mut manifest = Manifest::empty();
1021 manifest.set_epoch(u64::MAX - 1);
1022 let path = Path::from(TEST_MANIFEST_PATH);
1023 store
1024 .put(
1025 &path,
1026 PutPayload::from(manifest.to_bytes().unwrap().to_vec()),
1027 )
1028 .await
1029 .unwrap();
1030
1031 let consumer =
1032 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1033 consumer.initialize().await.unwrap();
1034
1035 let manifest = read_producer_manifest(&store, TEST_MANIFEST_PATH).await;
1036 assert_eq!(manifest.epoch, 0);
1037 }
1038
1039 #[tokio::test]
1040 async fn should_peek_first_entry_with_valid_epoch() {
1041 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1042 let producer =
1043 QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1044
1045 producer
1046 .enqueue("a.batch".to_string(), vec![])
1047 .await
1048 .unwrap();
1049 producer
1050 .enqueue("b.batch".to_string(), vec![])
1051 .await
1052 .unwrap();
1053
1054 let consumer =
1055 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1056 consumer.initialize().await.unwrap();
1057
1058 let entry = consumer.peek().await.unwrap().unwrap();
1059 assert_eq!(entry.location, "a.batch");
1060 }
1061
1062 #[tokio::test]
1063 async fn should_dequeue_entries_with_valid_epoch() {
1064 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1065 let producer =
1066 QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1067
1068 producer
1069 .enqueue("a.batch".to_string(), vec![])
1070 .await
1071 .unwrap();
1072 producer
1073 .enqueue("b.batch".to_string(), vec![])
1074 .await
1075 .unwrap();
1076 producer
1077 .enqueue("c.batch".to_string(), vec![])
1078 .await
1079 .unwrap();
1080
1081 let consumer =
1082 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1083 consumer.initialize().await.unwrap();
1084
1085 let removed = consumer.dequeue(1).await.unwrap();
1086 assert_eq!(removed.len(), 2);
1087 assert_eq!(removed[0].location, "a.batch");
1088 assert_eq!(removed[1].location, "b.batch");
1089
1090 let next = consumer.peek().await.unwrap().unwrap();
1091 assert_eq!(next.location, "c.batch");
1092 }
1093
1094 #[tokio::test]
1095 async fn should_enqueue_after_consumer_dequeue() {
1096 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1097 let producer =
1098 QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1099
1100 producer
1101 .enqueue("a.batch".to_string(), vec![])
1102 .await
1103 .unwrap();
1104 producer
1105 .enqueue("b.batch".to_string(), vec![])
1106 .await
1107 .unwrap();
1108
1109 let consumer =
1110 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1111 consumer.initialize().await.unwrap();
1112
1113 consumer.dequeue(1).await.unwrap();
1114
1115 producer
1116 .enqueue("c.batch".to_string(), vec![])
1117 .await
1118 .unwrap();
1119
1120 let next = consumer.peek().await.unwrap().unwrap();
1121 assert_eq!(next.location, "c.batch");
1122 assert_eq!(next.sequence, 2);
1123 }
1124
1125 #[tokio::test]
1126 async fn should_enqueue_locations_to_manifest() {
1127 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1128 let producer =
1129 QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1130
1131 producer
1132 .enqueue("path/to/file1.batch".to_string(), vec![])
1133 .await
1134 .unwrap();
1135 producer
1136 .enqueue("path/to/file2.batch".to_string(), vec![])
1137 .await
1138 .unwrap();
1139
1140 let manifest = read_producer_manifest(&store, TEST_MANIFEST_PATH).await;
1141 let locations: Vec<String> = manifest.iter().map(|e| e.unwrap().location).collect();
1142 assert_eq!(
1143 locations,
1144 vec!["path/to/file1.batch", "path/to/file2.batch"]
1145 );
1146 }
1147
1148 #[tokio::test]
1149 async fn should_merge_with_existing_manifest() {
1150 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1151
1152 let existing = Manifest::from_entries(&[QueueEntry {
1153 sequence: 0,
1154 location: "existing/file.batch".to_string(),
1155 metadata: vec![],
1156 }]);
1157 let path = Path::from(TEST_MANIFEST_PATH);
1158 store
1159 .put(
1160 &path,
1161 PutPayload::from(existing.to_bytes().unwrap().to_vec()),
1162 )
1163 .await
1164 .unwrap();
1165
1166 let producer =
1167 QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1168 producer
1169 .enqueue("new/file.batch".to_string(), vec![])
1170 .await
1171 .unwrap();
1172
1173 let manifest = read_producer_manifest(&store, "test/manifest").await;
1174 let locations: Vec<String> = manifest.iter().map(|e| e.unwrap().location).collect();
1175 assert_eq!(locations, vec!["existing/file.batch", "new/file.batch"]);
1176 }
1177
1178 fn entry(location: &str, metadata: Vec<Metadata>) -> QueueEntry {
1179 QueueEntry::new(location.to_string(), metadata).unwrap()
1180 }
1181
1182 fn entry_seq(seq: u64, location: &str, metadata: Vec<Metadata>) -> QueueEntry {
1183 QueueEntry {
1184 sequence: seq,
1185 location: location.to_string(),
1186 metadata,
1187 }
1188 }
1189
1190 fn meta(start_index: u32, time_ms: i64, data: &str) -> Metadata {
1191 Metadata {
1192 start_index,
1193 ingestion_time_ms: time_ms,
1194 payload: Bytes::from(data.to_string()),
1195 }
1196 }
1197
1198 fn collect_locations(manifest: &Manifest) -> Vec<String> {
1199 manifest.iter().map(|e| e.unwrap().location).collect()
1200 }
1201
1202 #[test]
1203 fn should_create_empty_manifest() {
1204 let m = Manifest::empty();
1205
1206 assert_eq!(m.entries_count(), 0);
1207 assert!(m.is_empty());
1208 assert_eq!(m.epoch, 0);
1209
1210 let bytes = m.to_bytes().unwrap();
1211 assert_eq!(bytes.len(), FOOTER_SIZE);
1212 assert_eq!(u32::from_le_bytes(bytes[0..4].try_into().unwrap()), 0);
1213 assert_eq!(u64::from_le_bytes(bytes[4..12].try_into().unwrap()), 0);
1214 assert_eq!(u64::from_le_bytes(bytes[12..20].try_into().unwrap()), 0);
1215 assert_eq!(
1216 u16::from_le_bytes(bytes[20..22].try_into().unwrap()),
1217 MANIFEST_VERSION
1218 );
1219 }
1220
1221 #[test]
1222 fn should_parse_valid_manifest_bytes() {
1223 let entries = vec![
1224 entry_seq(0, "a", vec![meta(0, 1, "x")]),
1225 entry_seq(1, "b", vec![meta(0, 2, "y")]),
1226 ];
1227 let data = Manifest::from_entries(&entries).to_bytes().unwrap();
1228
1229 let m = Manifest::from_bytes(data).unwrap();
1230
1231 assert_eq!(m.entries_count(), 2);
1232 }
1233
1234 #[test]
1235 fn should_parse_footer_only_bytes() {
1236 let mut buf = BytesMut::with_capacity(FOOTER_SIZE);
1237 buf.put_u32_le(0);
1238 buf.put_u64_le(42);
1239 buf.put_u64_le(0);
1240 buf.put_u16_le(MANIFEST_VERSION);
1241
1242 let m = Manifest::from_bytes(buf.freeze()).unwrap();
1243
1244 assert_eq!(m.entries_count(), 0);
1245 assert_eq!(m.epoch, 0);
1246
1247 let mut m = m;
1248 m.append(&entry("loc", vec![])).unwrap();
1249 let entries: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1250 assert_eq!(entries[0].sequence, 42);
1251 }
1252
1253 #[test]
1254 fn should_reject_empty_bytes() {
1255 let err = Manifest::from_bytes(Bytes::new()).unwrap_err();
1256
1257 assert!(err.to_string().contains("must not be empty"));
1258 }
1259
1260 #[test]
1261 fn should_reject_bytes_too_short_for_footer() {
1262 let err = Manifest::from_bytes(Bytes::from_static(&[0; 21])).unwrap_err();
1263
1264 assert!(err.to_string().contains("too short for footer"));
1265 }
1266
1267 #[test]
1268 fn should_reject_wrong_version() {
1269 let mut buf = BytesMut::with_capacity(FOOTER_SIZE);
1270 buf.put_u32_le(0);
1271 buf.put_u64_le(0);
1272 buf.put_u64_le(0);
1273 buf.put_u16_le(99);
1274
1275 let err = Manifest::from_bytes(buf.freeze()).unwrap_err();
1276
1277 assert!(err.to_string().contains("unsupported"));
1278 assert!(err.to_string().contains("99"));
1279 }
1280
1281 #[test]
1282 fn should_reject_version_zero() {
1283 let mut buf = BytesMut::with_capacity(FOOTER_SIZE);
1284 buf.put_u32_le(0);
1285 buf.put_u64_le(0);
1286 buf.put_u64_le(0);
1287 buf.put_u16_le(0);
1288
1289 let err = Manifest::from_bytes(buf.freeze()).unwrap_err();
1290
1291 assert!(err.to_string().contains("unsupported"));
1292 }
1293
1294 #[test]
1295 fn should_make_appended_entry_accessible_via_iter() {
1296 let mut m = Manifest::empty();
1297
1298 m.append(&entry("loc", vec![meta(0, 42, "meta")])).unwrap();
1299
1300 let entries: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1301 assert_eq!(entries.len(), 1);
1302 assert_eq!(entries[0].sequence, 0);
1303 assert_eq!(entries[0].location, "loc");
1304 assert_eq!(entries[0].metadata, vec![meta(0, 42, "meta")]);
1305 }
1306
1307 #[test]
1308 fn should_append_to_existing_base_entries() {
1309 let base = Manifest::from_entries(&[entry_seq(0, "base", vec![])]);
1310 let data = base.to_bytes().unwrap();
1311 let mut m = Manifest::from_bytes(data).unwrap();
1312
1313 m.append(&entry("appended", vec![])).unwrap();
1314
1315 assert_eq!(m.entries_count(), 2);
1316 assert_eq!(collect_locations(&m), vec!["base", "appended"]);
1317 let entries: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1318 assert_eq!(entries[0].sequence, 0);
1319 assert_eq!(entries[1].sequence, 1);
1320 }
1321
1322 #[test]
1323 fn should_preserve_append_order() {
1324 let mut m = Manifest::empty();
1325
1326 m.append(&entry("a", vec![])).unwrap();
1327 m.append(&entry("b", vec![])).unwrap();
1328 m.append(&entry("c", vec![])).unwrap();
1329
1330 assert_eq!(collect_locations(&m), vec!["a", "b", "c"]);
1331 let entries: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1332 assert_eq!(entries[0].sequence, 0);
1333 assert_eq!(entries[1].sequence, 1);
1334 assert_eq!(entries[2].sequence, 2);
1335 }
1336
1337 #[test]
1338 fn should_handle_entry_with_empty_location() {
1339 let m = Manifest::from_entries(&[entry_seq(0, "", vec![])]);
1340
1341 let decoded: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1342 assert_eq!(decoded[0].location, "");
1343 assert!(decoded[0].metadata.is_empty());
1344 }
1345
1346 #[test]
1347 fn should_handle_entry_with_large_metadata() {
1348 let big_meta = Bytes::from(vec![0xAB_u8; 1024]);
1349
1350 let m = Manifest::from_entries(&[entry_seq(
1351 0,
1352 "loc",
1353 vec![Metadata {
1354 start_index: 0,
1355 ingestion_time_ms: 1,
1356 payload: big_meta.clone(),
1357 }],
1358 )]);
1359
1360 let decoded: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1361 assert_eq!(decoded[0].metadata.len(), 1);
1362 assert_eq!(decoded[0].metadata[0].payload, big_meta);
1363 }
1364
1365 #[test]
1366 fn should_handle_negative_ingestion_time() {
1367 let m = Manifest::from_entries(&[entry_seq(0, "loc", vec![meta(0, -1000, "")])]);
1368
1369 let decoded: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1370 assert_eq!(decoded[0].metadata[0].ingestion_time_ms, -1000);
1371 }
1372
1373 #[test]
1374 fn should_return_footer_for_empty_manifest() {
1375 let m = Manifest::empty();
1376
1377 let bytes = m.to_bytes().unwrap();
1378
1379 assert_eq!(bytes.len(), FOOTER_SIZE);
1380 assert_eq!(u32::from_le_bytes(bytes[0..4].try_into().unwrap()), 0);
1381 assert_eq!(u64::from_le_bytes(bytes[4..12].try_into().unwrap()), 0);
1382 assert_eq!(u64::from_le_bytes(bytes[12..20].try_into().unwrap()), 0);
1383 assert_eq!(
1384 u16::from_le_bytes(bytes[20..22].try_into().unwrap()),
1385 MANIFEST_VERSION
1386 );
1387 }
1388
1389 #[test]
1390 fn should_merge_base_and_appended() {
1391 let base = Manifest::from_entries(&[entry_seq(0, "base", vec![])]);
1392 let mut m = Manifest::from_bytes(base.to_bytes().unwrap()).unwrap();
1393 m.append(&entry("appended", vec![])).unwrap();
1394
1395 let serialized = m.to_bytes().unwrap();
1396 let reparsed = Manifest::from_bytes(serialized).unwrap();
1397
1398 assert_eq!(reparsed.entries_count(), 2);
1399 assert_eq!(collect_locations(&reparsed), vec!["base", "appended"]);
1400 let entries: Vec<QueueEntry> = reparsed.iter().map(|e| e.unwrap()).collect();
1401 assert_eq!(entries[0].sequence, 0);
1402 assert_eq!(entries[1].sequence, 1);
1403 }
1404
1405 #[test]
1406 fn should_write_correct_footer_count() {
1407 let base = Manifest::from_entries(&[entry_seq(0, "a", vec![]), entry_seq(1, "b", vec![])]);
1408 let mut m = Manifest::from_bytes(base.to_bytes().unwrap()).unwrap();
1409 m.append(&entry("c", vec![])).unwrap();
1410 m.append(&entry("d", vec![])).unwrap();
1411 m.append(&entry("e", vec![])).unwrap();
1412
1413 let bytes = m.to_bytes().unwrap();
1414
1415 let footer_start = bytes.len() - FOOTER_SIZE;
1416 let count = u32::from_le_bytes(bytes[footer_start..footer_start + 4].try_into().unwrap());
1417 let next_seq = u64::from_le_bytes(
1418 bytes[footer_start + 4..footer_start + 12]
1419 .try_into()
1420 .unwrap(),
1421 );
1422 let epoch = u64::from_le_bytes(
1423 bytes[footer_start + 12..footer_start + 20]
1424 .try_into()
1425 .unwrap(),
1426 );
1427 let version = u16::from_le_bytes(bytes[footer_start + 20..].try_into().unwrap());
1428 assert_eq!(count, 5);
1429 assert_eq!(next_seq, 5);
1430 assert_eq!(epoch, 0);
1431 assert_eq!(version, MANIFEST_VERSION);
1432 }
1433
1434 #[test]
1435 fn should_round_trip_from_entries_to_bytes_from_bytes() {
1436 let entries = vec![
1437 entry_seq(0, "a", vec![meta(0, 10, "m1")]),
1438 entry_seq(1, "b", vec![meta(0, 20, "m2")]),
1439 ];
1440 let original = Manifest::from_entries(&entries);
1441
1442 let reparsed = Manifest::from_bytes(original.to_bytes().unwrap()).unwrap();
1443
1444 assert_eq!(reparsed.entries_count(), 2);
1445 let decoded: Vec<QueueEntry> = reparsed.iter().map(|e| e.unwrap()).collect();
1446 assert_eq!(decoded[0].sequence, 0);
1447 assert_eq!(decoded[0].location, "a");
1448 assert_eq!(decoded[0].metadata, vec![meta(0, 10, "m1")]);
1449 assert_eq!(decoded[1].sequence, 1);
1450 assert_eq!(decoded[1].location, "b");
1451 assert_eq!(decoded[1].metadata, vec![meta(0, 20, "m2")]);
1452 }
1453
1454 #[test]
1455 fn should_round_trip_append_serialize_reparse() {
1456 let mut m = Manifest::empty();
1457 m.append(&entry("x", vec![meta(0, 100, "data")])).unwrap();
1458 m.append(&entry("y", vec![meta(0, 200, "more")])).unwrap();
1459
1460 let reparsed = Manifest::from_bytes(m.to_bytes().unwrap()).unwrap();
1461
1462 assert_eq!(reparsed.entries_count(), 2);
1463 assert_eq!(collect_locations(&reparsed), vec!["x", "y"]);
1464 }
1465
1466 #[test]
1467 fn should_chain_serialize_reparse_append() {
1468 let original = Manifest::from_entries(&[entry_seq(0, "a", vec![])]);
1469 let mut m = Manifest::from_bytes(original.to_bytes().unwrap()).unwrap();
1470 m.append(&entry("b", vec![])).unwrap();
1471
1472 let mut m2 = Manifest::from_bytes(m.to_bytes().unwrap()).unwrap();
1473 m2.append(&entry("c", vec![])).unwrap();
1474
1475 let final_m = Manifest::from_bytes(m2.to_bytes().unwrap()).unwrap();
1476
1477 assert_eq!(final_m.entries_count(), 3);
1478 assert_eq!(collect_locations(&final_m), vec!["a", "b", "c"]);
1479 let entries: Vec<QueueEntry> = final_m.iter().map(|e| e.unwrap()).collect();
1480 assert_eq!(entries[2].sequence, 2);
1481 }
1482
1483 #[test]
1484 fn should_dequeue_entries_through_sequence() {
1485 let mut m = Manifest::empty();
1486 for _ in 0..5 {
1487 m.append(&entry("loc", vec![])).unwrap();
1488 }
1489
1490 let removed = m.dequeue(2).unwrap();
1491
1492 assert_eq!(removed.len(), 3);
1493 assert_eq!(removed[0].sequence, 0);
1494 assert_eq!(removed[1].sequence, 1);
1495 assert_eq!(removed[2].sequence, 2);
1496 assert_eq!(m.entries_count(), 2);
1497 let remaining: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1498 assert_eq!(remaining[0].sequence, 3);
1499 assert_eq!(remaining[1].sequence, 4);
1500 assert_eq!(m.next_sequence, 5);
1501 }
1502
1503 #[test]
1504 fn should_dequeue_all_entries() {
1505 let mut m = Manifest::empty();
1506 for _ in 0..3 {
1507 m.append(&entry("loc", vec![])).unwrap();
1508 }
1509
1510 let removed = m.dequeue(2).unwrap();
1511
1512 assert_eq!(removed.len(), 3);
1513 assert!(m.is_empty());
1514 assert_eq!(m.next_sequence, 3);
1515 }
1516
1517 #[test]
1518 fn should_dequeue_nothing_when_sequence_below_first() {
1519 let entries = vec![
1520 entry_seq(5, "a", vec![]),
1521 entry_seq(6, "b", vec![]),
1522 entry_seq(7, "c", vec![]),
1523 ];
1524 let mut m = Manifest::from_entries(&entries);
1525
1526 let removed = m.dequeue(3).unwrap();
1527
1528 assert!(removed.is_empty());
1529 assert_eq!(m.entries_count(), 3);
1530 }
1531
1532 #[test]
1533 fn should_append_after_dequeue() {
1534 let mut m = Manifest::empty();
1535 for _ in 0..3 {
1536 m.append(&entry("loc", vec![])).unwrap();
1537 }
1538
1539 m.dequeue(0).unwrap();
1540
1541 assert_eq!(m.entries_count(), 2);
1542 let remaining: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1543 assert_eq!(remaining[0].sequence, 1);
1544 assert_eq!(remaining[1].sequence, 2);
1545
1546 m.append(&entry("new", vec![])).unwrap();
1547 let all: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1548 assert_eq!(all.len(), 3);
1549 assert_eq!(all[2].sequence, 3);
1550 }
1551
1552 fn encode_entry_bytes(entry: &QueueEntry) -> Vec<u8> {
1554 let mut buf = BytesMut::new();
1555 Manifest::encode_entry(&mut buf, entry).unwrap();
1556 buf.to_vec()
1557 }
1558
1559 fn manifest_from_raw_entry(entry_bytes: &[u8]) -> Manifest {
1562 let mut buf = BytesMut::new();
1563 buf.extend_from_slice(entry_bytes);
1564 buf.put_u32_le(1); buf.put_u64_le(1); buf.put_u64_le(0); buf.put_u16_le(MANIFEST_VERSION);
1568 Manifest::from_bytes(buf.freeze()).unwrap()
1569 }
1570
1571 const METADATA_COUNT_OFFSET: usize = ENTRY_LEN_SIZE + SEQUENCE_SIZE + LOCATION_LEN_SIZE + 1;
1574
1575 fn corrupt_metadata_entry(count: u32, extra_bytes: &[u8]) -> Vec<u8> {
1579 let e = QueueEntry {
1580 sequence: 1,
1581 location: "a".to_string(),
1582 metadata: vec![],
1583 };
1584 let mut raw = encode_entry_bytes(&e);
1585 raw[METADATA_COUNT_OFFSET..METADATA_COUNT_OFFSET + 4].copy_from_slice(&count.to_le_bytes());
1587 raw.extend_from_slice(extra_bytes);
1589 let new_entry_len = (raw.len() - ENTRY_LEN_SIZE) as u32;
1591 raw[..ENTRY_LEN_SIZE].copy_from_slice(&new_entry_len.to_le_bytes());
1592 raw
1593 }
1594
1595 #[test]
1596 fn should_reject_trailing_bytes_before_footer() {
1597 let mut raw = encode_entry_bytes(&entry_seq(0, "loc", vec![]));
1599 raw.extend_from_slice(&[0xFFu8; 5]); let manifest = manifest_from_raw_entry(&raw);
1602 let items: Vec<Result<QueueEntry>> = manifest.iter().collect();
1603 assert_eq!(items.len(), 2);
1604 assert!(items[0].is_ok());
1605 let err = items[1].as_ref().unwrap_err();
1606 assert!(
1607 err.to_string().contains("did not consume all bytes"),
1608 "got: {}",
1609 err
1610 );
1611 }
1612
1613 #[test]
1614 fn should_reject_entry_with_entry_len_below_minimum() {
1615 let bad_entry_len = (SEQUENCE_SIZE + LOCATION_LEN_SIZE + METADATA_COUNT_SIZE - 1) as u32;
1617 let mut raw = Vec::new();
1618 raw.extend_from_slice(&bad_entry_len.to_le_bytes());
1619 raw.extend_from_slice(&[0u8; 13]); let manifest = manifest_from_raw_entry(&raw);
1622 let err = manifest.iter().next().unwrap().unwrap_err();
1623 assert!(err.to_string().contains(
1624 "entry length 13 is less than minimum entry length 14 for the length of the location 0"
1625 ));
1626 }
1627
1628 #[test]
1629 fn should_reject_entry_with_entry_len_below_minimum_for_location() {
1630 let location = "abc";
1631 let bad_entry_len =
1633 (SEQUENCE_SIZE + LOCATION_LEN_SIZE + METADATA_COUNT_SIZE + location.len() - 1) as u32;
1634 let mut raw = Vec::new();
1635 raw.extend_from_slice(&bad_entry_len.to_le_bytes());
1636 raw.extend_from_slice(&0u64.to_le_bytes()); raw.extend_from_slice(&(location.len() as u16).to_le_bytes()); raw.extend_from_slice(&[0u8; 20]); let manifest = manifest_from_raw_entry(&raw);
1641 let err = manifest.iter().next().unwrap().unwrap_err();
1642 assert!(
1643 err.to_string()
1644 .contains("entry length 16 is less than minimum entry length 17"),
1645 "got: {}",
1646 err
1647 );
1648 }
1649
1650 #[test]
1651 fn should_reject_entry_truncated_before_entry_len() {
1652 let manifest = manifest_from_raw_entry(&[0u8; 2]);
1654 let err = manifest.iter().next().unwrap().unwrap_err();
1655 assert!(
1656 matches!(&err, Error::Serialization(msg) if msg.contains("entry length field does not fit")),
1657 "unexpected error: {err}"
1658 );
1659 }
1660
1661 #[test]
1662 fn should_reject_entry_truncated_before_metadata_start_index() {
1663 let manifest = manifest_from_raw_entry(&corrupt_metadata_entry(1, &[]));
1665 let err = manifest.iter().next().unwrap().unwrap_err();
1666 assert!(
1667 matches!(&err, Error::Serialization(msg) if msg.contains("start index field does not fit")),
1668 "unexpected error: {err}"
1669 );
1670 }
1671
1672 #[test]
1673 fn should_reject_entry_truncated_before_metadata_ingestion_time() {
1674 let manifest = manifest_from_raw_entry(&corrupt_metadata_entry(1, &0u32.to_le_bytes()));
1676 let err = manifest.iter().next().unwrap().unwrap_err();
1677 assert!(
1678 matches!(&err, Error::Serialization(msg) if msg.contains("ingestion time field does not fit")),
1679 "unexpected error: {err}"
1680 );
1681 }
1682
1683 #[test]
1684 fn should_reject_entry_truncated_before_metadata_length() {
1685 let mut extra = Vec::new();
1687 extra.extend_from_slice(&0u32.to_le_bytes()); extra.extend_from_slice(&0i64.to_le_bytes()); let manifest = manifest_from_raw_entry(&corrupt_metadata_entry(1, &extra));
1690 let err = manifest.iter().next().unwrap().unwrap_err();
1691 assert!(
1692 matches!(&err, Error::Serialization(msg) if msg.contains("metadata length field does not fit")),
1693 "unexpected error: {err}"
1694 );
1695 }
1696
1697 #[test]
1698 fn should_reject_entry_truncated_before_metadata_payload() {
1699 let mut extra = Vec::new();
1701 extra.extend_from_slice(&0u32.to_le_bytes()); extra.extend_from_slice(&0i64.to_le_bytes()); extra.extend_from_slice(&10u32.to_le_bytes()); extra.extend_from_slice(&[0xAB, 0xCD]); let manifest = manifest_from_raw_entry(&corrupt_metadata_entry(1, &extra));
1706 let err = manifest.iter().next().unwrap().unwrap_err();
1707 assert!(
1708 matches!(&err, Error::Serialization(msg) if msg.contains("metadata has less bytes than set")),
1709 "unexpected error: {err}"
1710 );
1711 }
1712
1713 #[tokio::test]
1714 async fn should_reject_location_exceeding_u16_max() {
1715 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1716 let producer =
1717 QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1718
1719 let long_location = "x".repeat(u16::MAX as usize + 1);
1720 let result = producer.enqueue(long_location, vec![]).await;
1721 assert!(matches!(result, Err(Error::InvalidInput(msg)) if msg.contains("location length")));
1722 }
1723}