1use std::sync::Arc;
2use std::sync::atomic::{AtomicU64, Ordering};
3
4use bytes::{BufMut, Bytes, BytesMut};
5use slatedb::object_store::path::Path;
6use slatedb::object_store::{
7 Error as ObjectStoreError, ObjectStore, PutMode, PutPayload, UpdateVersion,
8};
9
10use crate::error::{Error, Result};
11
12const MANIFEST_VERSION: u16 = 1;
13const UNINITIALIZED_EPOCH: u64 = u64::MAX;
14const ENTRY_LEN_SIZE: usize = 4;
15const LOCATION_LEN_SIZE: usize = 2;
16const INGESTION_TIME_MS_SIZE: usize = 8;
17const METADATA_LEN_SIZE: usize = 4;
18const START_INDEX_SIZE: usize = 4;
19const METADATA_COUNT_SIZE: usize = 4;
20const ENTRIES_COUNT_SIZE: usize = 4;
21const SEQUENCE_SIZE: usize = 8;
22const EPOCH_SIZE: usize = 8;
23const VERSION_SIZE: usize = 2;
24const FOOTER_SIZE: usize = ENTRIES_COUNT_SIZE + SEQUENCE_SIZE + EPOCH_SIZE + VERSION_SIZE;
25
26#[derive(Debug, Clone, PartialEq)]
28pub struct Metadata {
29 pub start_index: u32,
31 pub ingestion_time_ms: i64,
33 pub payload: Bytes,
35}
36
37#[derive(Debug, Clone)]
38pub(crate) struct QueueEntry {
39 pub(crate) sequence: u64,
40 pub(crate) location: String,
41 pub(crate) metadata: Vec<Metadata>,
42}
43
44impl QueueEntry {
45 fn new(location: String, metadata: Vec<Metadata>) -> Result<Self> {
46 if location.len() > u16::MAX as usize {
47 return Err(Error::InvalidInput(format!(
48 "location length {} exceeds u16::MAX",
49 location.len()
50 )));
51 }
52 if metadata.len() > u32::MAX as usize {
53 return Err(Error::InvalidInput(format!(
54 "metadata count {} exceeds u32::MAX",
55 metadata.len()
56 )));
57 }
58 Ok(Self {
59 sequence: 0,
60 location,
61 metadata,
62 })
63 }
64
65 fn clone_with_sequence(&self, sequence: u64) -> Self {
66 Self {
67 sequence,
68 ..self.clone()
69 }
70 }
71}
72
73#[derive(Debug, Clone)]
74pub(crate) struct Manifest {
75 data: Bytes,
76 appended: BytesMut,
77 appended_count: usize,
78 next_sequence: u64,
79 epoch: u64,
80}
81
82impl Manifest {
83 fn empty() -> Self {
85 let mut buf = BytesMut::with_capacity(FOOTER_SIZE);
86 buf.put_u32_le(0);
87 buf.put_u64_le(0);
88 buf.put_u64_le(0);
89 buf.put_u16_le(MANIFEST_VERSION);
90 Self {
91 data: buf.freeze(),
92 appended: BytesMut::new(),
93 appended_count: 0,
94 next_sequence: 0,
95 epoch: 0,
96 }
97 }
98
99 pub(crate) fn from_bytes(data: Bytes) -> Result<Self> {
101 if data.is_empty() {
102 return Err(Error::Serialization(
103 "queue manifest data must not be empty".to_string(),
104 ));
105 }
106 if data.len() < FOOTER_SIZE {
107 return Err(Error::Serialization(
108 "queue manifest too short for footer".to_string(),
109 ));
110 }
111 let version_start = data.len() - VERSION_SIZE;
112 let version = u16::from_le_bytes(data[version_start..].try_into().unwrap());
113 if version != MANIFEST_VERSION {
114 return Err(Error::Serialization(format!(
115 "unsupported queue manifest version: {}",
116 version
117 )));
118 }
119 let epoch_start = data.len() - VERSION_SIZE - EPOCH_SIZE;
120 let epoch = u64::from_le_bytes(
121 data[epoch_start..epoch_start + EPOCH_SIZE]
122 .try_into()
123 .unwrap(),
124 );
125 let next_seq_start = data.len() - VERSION_SIZE - EPOCH_SIZE - SEQUENCE_SIZE;
126 let next_sequence = u64::from_le_bytes(
127 data[next_seq_start..next_seq_start + SEQUENCE_SIZE]
128 .try_into()
129 .unwrap(),
130 );
131 Ok(Self {
132 data,
133 appended: BytesMut::new(),
134 appended_count: 0,
135 next_sequence,
136 epoch,
137 })
138 }
139
140 #[cfg(test)]
142 fn from_entries(entries: &[QueueEntry]) -> Self {
143 let next_sequence = entries.iter().map(|e| e.sequence + 1).max().unwrap_or(0);
144 let mut buf = BytesMut::new();
145 for entry in entries {
146 Self::encode_entry(&mut buf, entry).unwrap();
147 }
148 buf.put_u32_le(entries.len() as u32);
149 buf.put_u64_le(next_sequence);
150 buf.put_u64_le(0);
151 buf.put_u16_le(MANIFEST_VERSION);
152 Self {
153 data: buf.freeze(),
154 appended: BytesMut::new(),
155 appended_count: 0,
156 next_sequence,
157 epoch: 0,
158 }
159 }
160
161 fn entries_count(&self) -> usize {
163 let base = self.existing_entries_count();
164 base + self.appended_count
165 }
166
167 #[cfg(test)]
169 fn is_empty(&self) -> bool {
170 self.entries_count() == 0
171 }
172
173 pub(crate) fn iter(&self) -> ManifestIter<'_> {
175 let base_count = self.existing_entries_count();
176 let entries_end = if self.data.is_empty() {
177 0
178 } else {
179 self.data.len() - FOOTER_SIZE
180 };
181 ManifestIter {
182 data: &self.data,
183 offset: 0,
184 remaining: base_count,
185 entries_end,
186 appended: &self.appended,
187 appended_offset: 0,
188 appended_remaining: self.appended_count,
189 }
190 }
191
192 fn existing_entries_count(&self) -> usize {
193 if self.data.is_empty() {
194 0
195 } else {
196 let footer_start = self.data.len() - FOOTER_SIZE;
197 u32::from_le_bytes(
198 self.data[footer_start..footer_start + ENTRIES_COUNT_SIZE]
199 .try_into()
200 .unwrap(),
201 ) as usize
202 }
203 }
204
205 fn append(&mut self, entry: &QueueEntry) -> Result<()> {
209 let sequenced = entry.clone_with_sequence(self.next_sequence);
210 Self::encode_entry(&mut self.appended, &sequenced)?;
211 self.next_sequence += 1;
212 self.appended_count += 1;
213 Ok(())
214 }
215
216 fn dequeue(&mut self, through_sequence: u64) -> Result<Vec<QueueEntry>> {
221 let next_seq = self.next_sequence;
222 let epoch = self.epoch;
223
224 let base_count = self.existing_entries_count();
225 let entries_end = if self.data.is_empty() {
226 0
227 } else {
228 self.data.len() - FOOTER_SIZE
229 };
230
231 let (mut removed, remaining_base_start, remaining_base_count) =
232 split_entries(&self.data, base_count, entries_end, through_sequence)?;
233
234 let appended_end = self.appended.len();
235 let (appended_removed, remaining_appended_start, remaining_appended_count) = split_entries(
236 &self.appended,
237 self.appended_count,
238 appended_end,
239 through_sequence,
240 )?;
241 removed.extend(appended_removed);
242
243 let remaining_base_bytes = &self.data[remaining_base_start..entries_end];
244 let remaining_appended_bytes = &self.appended[remaining_appended_start..appended_end];
245 let total_remaining = remaining_base_count + remaining_appended_count;
246
247 let mut buf = BytesMut::with_capacity(
248 remaining_base_bytes.len() + remaining_appended_bytes.len() + FOOTER_SIZE,
249 );
250 buf.extend_from_slice(remaining_base_bytes);
251 buf.extend_from_slice(remaining_appended_bytes);
252 buf.put_u32_le(total_remaining);
253 buf.put_u64_le(next_seq);
254 buf.put_u64_le(epoch);
255 buf.put_u16_le(MANIFEST_VERSION);
256
257 self.data = buf.freeze();
258 self.appended = BytesMut::new();
259 self.appended_count = 0;
260 self.next_sequence = next_seq;
261 self.epoch = epoch;
262
263 Ok(removed)
264 }
265
266 fn set_epoch(&mut self, epoch: u64) {
268 self.epoch = epoch;
269 let mut buf = BytesMut::from(self.data.as_ref());
270 let epoch_start = buf.len() - VERSION_SIZE - EPOCH_SIZE;
271 buf[epoch_start..epoch_start + EPOCH_SIZE].copy_from_slice(&epoch.to_le_bytes());
272 self.data = buf.freeze();
273 }
274
275 fn to_bytes(&self) -> Result<Bytes> {
279 if self.appended.is_empty() {
280 return Ok(self.data.clone());
281 }
282 let (prefix, base_count) = if self.data.is_empty() {
283 (&[] as &[u8], 0u32)
284 } else {
285 let footer_start = self.data.len() - FOOTER_SIZE;
286 let count = u32::from_le_bytes(
287 self.data[footer_start..footer_start + ENTRIES_COUNT_SIZE]
288 .try_into()
289 .unwrap(),
290 );
291 (&self.data[..footer_start], count)
292 };
293 let total_count: u32 = base_count
294 .checked_add(self.appended_count as u32)
295 .ok_or_else(|| {
296 Error::Serialization(format!(
297 "total entry count consisting of {} existing entries + {} appended entries exceeds u32::MAX",
298 base_count, self.appended_count
299 ))
300 })?;
301 let mut buf = BytesMut::with_capacity(prefix.len() + self.appended.len() + FOOTER_SIZE);
302 buf.extend_from_slice(prefix);
303 buf.extend_from_slice(&self.appended);
304 buf.put_u32_le(total_count);
305 buf.put_u64_le(self.next_sequence);
306 buf.put_u64_le(self.epoch);
307 buf.put_u16_le(MANIFEST_VERSION);
308 Ok(buf.freeze())
309 }
310
311 fn encode_entry(buf: &mut BytesMut, entry: &QueueEntry) -> Result<()> {
312 debug_assert!(entry.location.len() <= u16::MAX as usize);
313 let metadata_size: usize = METADATA_COUNT_SIZE
314 + entry
315 .metadata
316 .iter()
317 .map(|m| {
318 START_INDEX_SIZE + INGESTION_TIME_MS_SIZE + METADATA_LEN_SIZE + m.payload.len()
319 })
320 .sum::<usize>();
321 let entry_body_len =
322 SEQUENCE_SIZE + LOCATION_LEN_SIZE + entry.location.len() + metadata_size;
323 debug_assert!(entry_body_len <= u32::MAX as usize);
324 buf.put_u32_le(entry_body_len as u32);
325 buf.put_u64_le(entry.sequence);
326 buf.put_u16_le(entry.location.len() as u16);
327 buf.extend_from_slice(entry.location.as_bytes());
328 debug_assert!(entry.metadata.len() <= u32::MAX as usize);
329 buf.put_u32_le(entry.metadata.len() as u32);
330 for m in &entry.metadata {
331 if m.payload.len() > u32::MAX as usize {
332 return Err(Error::InvalidInput(format!(
333 "metadata payload size {} exceeds u32::MAX",
334 m.payload.len()
335 )));
336 }
337 buf.put_u32_le(m.start_index);
338 buf.put_i64_le(m.ingestion_time_ms);
339 buf.put_u32_le(m.payload.len() as u32);
340 buf.extend_from_slice(&m.payload);
341 }
342 Ok(())
343 }
344}
345
346fn split_entries(
350 data: &[u8],
351 count: usize,
352 end: usize,
353 through_sequence: u64,
354) -> Result<(Vec<QueueEntry>, usize, u32)> {
355 let mut removed = Vec::new();
356 let mut offset = 0usize;
357
358 for i in 0..count {
359 let entry_start = offset;
360 let entry = decode_entry(data, &mut offset, end)?;
361
362 if entry.sequence <= through_sequence {
363 removed.push(entry);
364 } else {
365 return Ok((removed, entry_start, (count - i) as u32));
366 }
367 }
368
369 Ok((removed, end, 0))
370}
371
372fn decode_entry(data: &[u8], offset: &mut usize, end: usize) -> Result<QueueEntry> {
374 if *offset + ENTRY_LEN_SIZE > end {
375 return Err(Error::Serialization(
376 "queue entry corrupt: size of entry length field does not fit in entry".to_string(),
377 ));
378 }
379
380 let entry_len =
381 u32::from_le_bytes(data[*offset..*offset + ENTRY_LEN_SIZE].try_into().unwrap()) as usize;
382 *offset += ENTRY_LEN_SIZE;
383
384 if *offset + entry_len > end {
385 return Err(Error::Serialization(
386 "queue entry corrupt: entry has less bytes than set in the entry length".to_string(),
387 ));
388 }
389
390 let entry_end = *offset + entry_len;
391
392 let sequence = u64::from_le_bytes(data[*offset..*offset + SEQUENCE_SIZE].try_into().unwrap());
393 *offset += SEQUENCE_SIZE;
394
395 let location_len = u16::from_le_bytes(
396 data[*offset..*offset + LOCATION_LEN_SIZE]
397 .try_into()
398 .unwrap(),
399 ) as usize;
400 *offset += LOCATION_LEN_SIZE;
401
402 let min_entry_len = SEQUENCE_SIZE + LOCATION_LEN_SIZE + location_len + METADATA_COUNT_SIZE;
403 if entry_len < min_entry_len {
404 return Err(Error::Serialization(format!(
405 "queue entry corrupt: entry length {} is less than minimum entry length {} for the length of the location {}",
406 entry_len, min_entry_len, location_len
407 )));
408 }
409
410 let location = String::from_utf8(data[*offset..*offset + location_len].to_vec())
411 .map_err(|e| Error::Serialization(e.to_string()))?;
412 *offset += location_len;
413
414 let metadata_count = u32::from_le_bytes(
415 data[*offset..*offset + METADATA_COUNT_SIZE]
416 .try_into()
417 .unwrap(),
418 ) as usize;
419 *offset += METADATA_COUNT_SIZE;
420
421 let mut metadata = Vec::with_capacity(metadata_count);
422 for _ in 0..metadata_count {
423 if *offset + START_INDEX_SIZE > end {
424 return Err(Error::Serialization(
425 "queue entry corrupt: size of start index field does not fit in entry".to_string(),
426 ));
427 }
428 let start_index = u32::from_le_bytes(
429 data[*offset..*offset + START_INDEX_SIZE]
430 .try_into()
431 .unwrap(),
432 );
433 *offset += START_INDEX_SIZE;
434
435 if *offset + INGESTION_TIME_MS_SIZE > end {
436 return Err(Error::Serialization(
437 "queue entry corrupt: size of ingestion time field does not fit in entry"
438 .to_string(),
439 ));
440 }
441 let ingestion_time_ms = i64::from_le_bytes(
442 data[*offset..*offset + INGESTION_TIME_MS_SIZE]
443 .try_into()
444 .unwrap(),
445 );
446 *offset += INGESTION_TIME_MS_SIZE;
447
448 if *offset + METADATA_LEN_SIZE > end {
449 return Err(Error::Serialization(
450 "queue entry corrupt: size of metadata length field does not fit in entry"
451 .to_string(),
452 ));
453 }
454 let m_len = u32::from_le_bytes(
455 data[*offset..*offset + METADATA_LEN_SIZE]
456 .try_into()
457 .unwrap(),
458 ) as usize;
459 *offset += METADATA_LEN_SIZE;
460
461 if *offset + m_len > end {
462 return Err(Error::Serialization(
463 "queue entry corrupt: metadata has less bytes than set in the metadata length"
464 .to_string(),
465 ));
466 }
467 metadata.push(Metadata {
468 start_index,
469 ingestion_time_ms,
470 payload: Bytes::copy_from_slice(&data[*offset..*offset + m_len]),
471 });
472 *offset += m_len;
473 }
474
475 *offset = entry_end;
476
477 Ok(QueueEntry {
478 sequence,
479 location,
480 metadata,
481 })
482}
483
484pub(crate) struct ManifestIter<'a> {
486 data: &'a [u8],
487 offset: usize,
488 remaining: usize,
489 entries_end: usize,
490 appended: &'a [u8],
491 appended_offset: usize,
492 appended_remaining: usize,
493}
494
495impl Iterator for ManifestIter<'_> {
496 type Item = Result<QueueEntry>;
497
498 fn next(&mut self) -> Option<Self::Item> {
499 if self.remaining > 0 {
500 self.remaining -= 1;
501 Some(decode_entry(self.data, &mut self.offset, self.entries_end))
502 } else if self.appended_remaining > 0 {
503 self.appended_remaining -= 1;
504 Some(decode_entry(
505 self.appended,
506 &mut self.appended_offset,
507 self.appended.len(),
508 ))
509 } else if self.offset != self.entries_end {
510 let err = Some(Err(Error::Serialization(format!(
511 "base entries did not consume all bytes: offset {} != entries_end {}",
512 self.offset, self.entries_end
513 ))));
514 self.offset = self.entries_end;
515 err
516 } else {
517 None
518 }
519 }
520}
521
522enum ManifestWriteError {
523 Conflict,
524 Fatal(Error),
525}
526
527#[derive(Clone)]
528pub(crate) struct ManifestStore {
529 pub(crate) object_store: Arc<dyn ObjectStore>,
530 pub(crate) manifest_path: String,
531}
532
533impl ManifestStore {
534 pub(crate) async fn read(&self) -> Result<(Manifest, Option<UpdateVersion>)> {
535 let path = Path::from(self.manifest_path.as_str());
536 match self.object_store.get(&path).await {
537 Ok(result) => {
538 let version = UpdateVersion {
539 e_tag: result.meta.e_tag.clone(),
540 version: result.meta.version.clone(),
541 };
542 let bytes = result
543 .bytes()
544 .await
545 .map_err(|e| Error::Storage(e.to_string()))?;
546 let manifest = Manifest::from_bytes(bytes)?;
547 Ok((manifest, Some(version)))
548 }
549 Err(ObjectStoreError::NotFound { .. }) => Ok((Manifest::empty(), None)),
550 Err(e) => Err(Error::Storage(e.to_string())),
551 }
552 }
553
554 async fn write(
555 &self,
556 manifest: &Manifest,
557 version: Option<UpdateVersion>,
558 ) -> std::result::Result<(), ManifestWriteError> {
559 let path = Path::from(self.manifest_path.as_str());
560 let put_mode = match version {
561 Some(v) => PutMode::Update(v),
562 None => PutMode::Create,
563 };
564 let data = manifest.to_bytes().map_err(ManifestWriteError::Fatal)?;
565
566 match self
567 .object_store
568 .put_opts(&path, PutPayload::from(data.to_vec()), put_mode.into())
569 .await
570 {
571 Ok(_) => Ok(()),
572 Err(ObjectStoreError::Precondition { .. })
573 | Err(ObjectStoreError::AlreadyExists { .. }) => Err(ManifestWriteError::Conflict),
574 Err(e) => Err(ManifestWriteError::Fatal(Error::Storage(e.to_string()))),
575 }
576 }
577}
578
579struct ConflictCounter {
580 write_count: AtomicU64,
581 conflict_count: AtomicU64,
582 role: &'static str,
583}
584
585impl ConflictCounter {
586 fn new(role: &'static str) -> Self {
587 Self {
588 write_count: AtomicU64::new(0),
589 conflict_count: AtomicU64::new(0),
590 role,
591 }
592 }
593
594 fn record_write(&self) {
595 self.write_count.fetch_add(1, Ordering::Relaxed);
596 metrics::counter!(crate::metric_names::MANIFEST_WRITES, "role" => self.role).increment(1);
597 }
598
599 fn record_conflict(&self) {
600 self.conflict_count.fetch_add(1, Ordering::Relaxed);
601 metrics::counter!(crate::metric_names::MANIFEST_CONFLICTS, "role" => self.role)
602 .increment(1);
603 }
604
605 fn conflict_rate(&self) -> f64 {
606 let writes = self.write_count.load(Ordering::Relaxed);
607 if writes == 0 {
608 return 0.0;
609 }
610 let conflicts = self.conflict_count.load(Ordering::Relaxed);
611 let rate = (conflicts as f64 / writes as f64) * 100.0;
612 rate.min(100.0)
613 }
614}
615
616pub struct QueueProducer {
622 manifest_store: ManifestStore,
623 counter: ConflictCounter,
624}
625
626impl QueueProducer {
627 pub fn with_object_store(manifest_path: String, object_store: Arc<dyn ObjectStore>) -> Self {
629 Self {
630 manifest_store: ManifestStore {
631 object_store,
632 manifest_path,
633 },
634 counter: ConflictCounter::new("producer"),
635 }
636 }
637
638 pub async fn enqueue(&self, location: String, metadata: Vec<Metadata>) -> Result<()> {
644 let entry = QueueEntry::new(location, metadata)?;
645 loop {
646 let (mut manifest, version) = self.manifest_store.read().await?;
647 manifest.append(&entry)?;
648 self.counter.record_write();
649 match self.manifest_store.write(&manifest, version).await {
650 Ok(()) => return Ok(()),
651 Err(ManifestWriteError::Conflict) => {
652 self.counter.record_conflict();
653 continue;
654 }
655 Err(ManifestWriteError::Fatal(e)) => return Err(e),
656 }
657 }
658 }
659
660 pub fn conflict_rate(&self) -> f64 {
662 self.counter.conflict_rate()
663 }
664}
665
666pub struct QueueConsumer {
675 manifest_store: ManifestStore,
676 epoch: AtomicU64,
677 counter: ConflictCounter,
678 queue_len: AtomicU64,
679}
680
681impl QueueConsumer {
682 pub fn with_object_store(manifest_path: String, object_store: Arc<dyn ObjectStore>) -> Self {
686 Self {
687 manifest_store: ManifestStore {
688 object_store,
689 manifest_path,
690 },
691 epoch: AtomicU64::new(UNINITIALIZED_EPOCH),
692 counter: ConflictCounter::new("consumer"),
693 queue_len: AtomicU64::new(0),
694 }
695 }
696
697 pub async fn initialize(&self) -> Result<()> {
700 loop {
701 let (mut manifest, version) = self.read_manifest().await?;
702 let mut new_epoch = manifest.epoch.wrapping_add(1);
703 if new_epoch == UNINITIALIZED_EPOCH {
704 new_epoch = new_epoch.wrapping_add(1);
705 }
706 manifest.set_epoch(new_epoch);
707 match self.write_manifest(&manifest, version).await {
708 Ok(()) => {
709 self.epoch.store(new_epoch, Ordering::Relaxed);
710 return Ok(());
711 }
712 Err(ManifestWriteError::Conflict) => {
713 self.counter.record_conflict();
714 continue;
715 }
716 Err(ManifestWriteError::Fatal(e)) => return Err(e),
717 }
718 }
719 }
720
721 pub(crate) async fn peek(&self) -> Result<Option<QueueEntry>> {
724 let (manifest, _) = self.read_manifest().await?;
725 if manifest.epoch != self.epoch.load(Ordering::Relaxed) {
726 return Err(Error::Fenced);
727 }
728 manifest.iter().next().transpose()
729 }
730
731 pub(crate) async fn read(&self, sequence: u64) -> Result<Option<QueueEntry>> {
734 let (manifest, _) = self.read_manifest().await?;
735 if manifest.epoch != self.epoch.load(Ordering::Relaxed) {
736 return Err(Error::Fenced);
737 }
738 manifest
739 .iter()
740 .find(|e| matches!(e, Ok(e) if e.sequence == sequence))
741 .transpose()
742 }
743
744 pub(crate) async fn dequeue(&self, through_sequence: u64) -> Result<Vec<QueueEntry>> {
747 loop {
748 let (mut manifest, version) = self.read_manifest().await?;
749 if manifest.epoch != self.epoch.load(Ordering::Relaxed) {
750 return Err(Error::Fenced);
751 }
752 let removed = manifest.dequeue(through_sequence)?;
753 match self.write_manifest(&manifest, version).await {
754 Ok(()) => return Ok(removed),
755 Err(ManifestWriteError::Conflict) => {
756 self.counter.record_conflict();
757 continue;
758 }
759 Err(ManifestWriteError::Fatal(e)) => return Err(e),
760 }
761 }
762 }
763
764 pub fn len(&self) -> usize {
766 self.queue_len.load(Ordering::Relaxed) as usize
767 }
768
769 async fn read_manifest(&self) -> Result<(Manifest, Option<UpdateVersion>)> {
770 let result = self.manifest_store.read().await?;
771 self.queue_len
772 .store(result.0.entries_count() as u64, Ordering::Relaxed);
773 Ok(result)
774 }
775
776 async fn write_manifest(
777 &self,
778 manifest: &Manifest,
779 version: Option<UpdateVersion>,
780 ) -> std::result::Result<(), ManifestWriteError> {
781 self.counter.record_write();
782 let result = self.manifest_store.write(manifest, version).await;
783 if result.is_ok() {
784 self.queue_len
785 .store(manifest.entries_count() as u64, Ordering::Relaxed);
786 }
787 result
788 }
789
790 pub fn conflict_rate(&self) -> f64 {
792 self.counter.conflict_rate()
793 }
794}
795
796#[derive(Debug, Clone, PartialEq)]
798pub struct ManifestEntry {
799 pub sequence: u64,
800 pub location: String,
801 pub metadata: Vec<Metadata>,
802}
803
804#[derive(Debug, Clone)]
806pub struct ManifestView {
807 pub epoch: u64,
808 pub next_sequence: u64,
809 entries: Vec<ManifestEntry>,
810}
811
812impl ManifestView {
813 pub fn entries(&self) -> &[ManifestEntry] {
815 &self.entries
816 }
817}
818
819pub fn parse_manifest(data: Bytes) -> Result<ManifestView> {
821 let manifest = Manifest::from_bytes(data)?;
822 let entries = manifest
823 .iter()
824 .map(|r| {
825 r.map(|e| ManifestEntry {
826 sequence: e.sequence,
827 location: e.location,
828 metadata: e.metadata,
829 })
830 })
831 .collect::<Result<Vec<_>>>()?;
832 Ok(ManifestView {
833 epoch: manifest.epoch,
834 next_sequence: manifest.next_sequence,
835 entries,
836 })
837}
838
839#[cfg(test)]
840mod tests {
841 use super::*;
842 use slatedb::object_store::memory::InMemory;
843
844 const TEST_MANIFEST_PATH: &str = "test/manifest";
845
846 async fn read_producer_manifest(store: &Arc<dyn ObjectStore>, path: &str) -> Manifest {
847 let path = Path::from(path);
848 let result = store.get(&path).await.unwrap();
849 let bytes = result.bytes().await.unwrap();
850 Manifest::from_bytes(bytes).unwrap()
851 }
852
853 #[tokio::test]
854 async fn should_initialize_consumer_and_increment_epoch() {
855 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
856 let consumer =
857 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
858
859 consumer.initialize().await.unwrap();
860
861 let manifest = read_producer_manifest(&store, TEST_MANIFEST_PATH).await;
862 assert_eq!(manifest.epoch, 1);
863 }
864
865 #[tokio::test]
866 async fn should_peek_none_when_queue_is_empty() {
867 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
868 let consumer =
869 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
870 consumer.initialize().await.unwrap();
871
872 let result = consumer.peek().await.unwrap();
873 assert!(result.is_none());
874 }
875
876 #[tokio::test]
877 async fn should_read_entry_by_sequence() {
878 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
879 let producer =
880 QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
881
882 producer
883 .enqueue("a.batch".to_string(), vec![])
884 .await
885 .unwrap();
886 producer
887 .enqueue("b.batch".to_string(), vec![])
888 .await
889 .unwrap();
890 producer
891 .enqueue("c.batch".to_string(), vec![])
892 .await
893 .unwrap();
894
895 let consumer =
896 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
897 consumer.initialize().await.unwrap();
898
899 let entry = consumer.read(1).await.unwrap().unwrap();
900 assert_eq!(entry.location, "b.batch");
901 assert_eq!(entry.sequence, 1);
902 }
903
904 #[tokio::test]
905 async fn should_read_none_for_missing_sequence() {
906 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
907 let producer =
908 QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
909
910 producer
911 .enqueue("a.batch".to_string(), vec![])
912 .await
913 .unwrap();
914
915 let consumer =
916 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
917 consumer.initialize().await.unwrap();
918
919 let result = consumer.read(99).await.unwrap();
920 assert!(result.is_none());
921 }
922
923 #[tokio::test]
924 async fn should_fence_old_consumer_on_peek() {
925 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
926 let consumer_a =
927 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
928 consumer_a.initialize().await.unwrap();
929
930 let consumer_b =
931 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
932 consumer_b.initialize().await.unwrap();
933
934 let result = consumer_a.peek().await;
935 assert!(matches!(result, Err(Error::Fenced)));
936 }
937
938 #[tokio::test]
939 async fn should_fence_old_consumer_on_dequeue() {
940 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
941 let consumer_a =
942 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
943 consumer_a.initialize().await.unwrap();
944
945 let consumer_b =
946 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
947 consumer_b.initialize().await.unwrap();
948
949 let result = consumer_a.dequeue(0).await;
950 assert!(matches!(result, Err(Error::Fenced)));
951 }
952
953 #[tokio::test]
954 async fn should_fence_uninitialized_consumer() {
955 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
956 let producer =
957 QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
958
959 producer
960 .enqueue("a.batch".to_string(), vec![])
961 .await
962 .unwrap();
963
964 let consumer =
965 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
966
967 let result = consumer.peek().await;
968 assert!(matches!(result, Err(Error::Fenced)));
969 }
970
971 #[tokio::test]
972 async fn should_wrap_epoch_to_zero_at_max() {
973 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
974
975 let mut manifest = Manifest::empty();
976 manifest.set_epoch(u64::MAX - 1);
977 let path = Path::from(TEST_MANIFEST_PATH);
978 store
979 .put(
980 &path,
981 PutPayload::from(manifest.to_bytes().unwrap().to_vec()),
982 )
983 .await
984 .unwrap();
985
986 let consumer =
987 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
988 consumer.initialize().await.unwrap();
989
990 let manifest = read_producer_manifest(&store, TEST_MANIFEST_PATH).await;
991 assert_eq!(manifest.epoch, 0);
992 }
993
994 #[tokio::test]
995 async fn should_peek_first_entry_with_valid_epoch() {
996 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
997 let producer =
998 QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
999
1000 producer
1001 .enqueue("a.batch".to_string(), vec![])
1002 .await
1003 .unwrap();
1004 producer
1005 .enqueue("b.batch".to_string(), vec![])
1006 .await
1007 .unwrap();
1008
1009 let consumer =
1010 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1011 consumer.initialize().await.unwrap();
1012
1013 let entry = consumer.peek().await.unwrap().unwrap();
1014 assert_eq!(entry.location, "a.batch");
1015 }
1016
1017 #[tokio::test]
1018 async fn should_dequeue_entries_with_valid_epoch() {
1019 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1020 let producer =
1021 QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1022
1023 producer
1024 .enqueue("a.batch".to_string(), vec![])
1025 .await
1026 .unwrap();
1027 producer
1028 .enqueue("b.batch".to_string(), vec![])
1029 .await
1030 .unwrap();
1031 producer
1032 .enqueue("c.batch".to_string(), vec![])
1033 .await
1034 .unwrap();
1035
1036 let consumer =
1037 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1038 consumer.initialize().await.unwrap();
1039
1040 let removed = consumer.dequeue(1).await.unwrap();
1041 assert_eq!(removed.len(), 2);
1042 assert_eq!(removed[0].location, "a.batch");
1043 assert_eq!(removed[1].location, "b.batch");
1044
1045 let next = consumer.peek().await.unwrap().unwrap();
1046 assert_eq!(next.location, "c.batch");
1047 }
1048
1049 #[tokio::test]
1050 async fn should_enqueue_after_consumer_dequeue() {
1051 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1052 let producer =
1053 QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1054
1055 producer
1056 .enqueue("a.batch".to_string(), vec![])
1057 .await
1058 .unwrap();
1059 producer
1060 .enqueue("b.batch".to_string(), vec![])
1061 .await
1062 .unwrap();
1063
1064 let consumer =
1065 QueueConsumer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1066 consumer.initialize().await.unwrap();
1067
1068 consumer.dequeue(1).await.unwrap();
1069
1070 producer
1071 .enqueue("c.batch".to_string(), vec![])
1072 .await
1073 .unwrap();
1074
1075 let next = consumer.peek().await.unwrap().unwrap();
1076 assert_eq!(next.location, "c.batch");
1077 assert_eq!(next.sequence, 2);
1078 }
1079
1080 #[tokio::test]
1081 async fn should_enqueue_locations_to_manifest() {
1082 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1083 let producer =
1084 QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1085
1086 producer
1087 .enqueue("path/to/file1.batch".to_string(), vec![])
1088 .await
1089 .unwrap();
1090 producer
1091 .enqueue("path/to/file2.batch".to_string(), vec![])
1092 .await
1093 .unwrap();
1094
1095 let manifest = read_producer_manifest(&store, TEST_MANIFEST_PATH).await;
1096 let locations: Vec<String> = manifest.iter().map(|e| e.unwrap().location).collect();
1097 assert_eq!(
1098 locations,
1099 vec!["path/to/file1.batch", "path/to/file2.batch"]
1100 );
1101 }
1102
1103 #[tokio::test]
1104 async fn should_merge_with_existing_manifest() {
1105 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1106
1107 let existing = Manifest::from_entries(&[QueueEntry {
1108 sequence: 0,
1109 location: "existing/file.batch".to_string(),
1110 metadata: vec![],
1111 }]);
1112 let path = Path::from(TEST_MANIFEST_PATH);
1113 store
1114 .put(
1115 &path,
1116 PutPayload::from(existing.to_bytes().unwrap().to_vec()),
1117 )
1118 .await
1119 .unwrap();
1120
1121 let producer =
1122 QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1123 producer
1124 .enqueue("new/file.batch".to_string(), vec![])
1125 .await
1126 .unwrap();
1127
1128 let manifest = read_producer_manifest(&store, "test/manifest").await;
1129 let locations: Vec<String> = manifest.iter().map(|e| e.unwrap().location).collect();
1130 assert_eq!(locations, vec!["existing/file.batch", "new/file.batch"]);
1131 }
1132
1133 fn entry(location: &str, metadata: Vec<Metadata>) -> QueueEntry {
1134 QueueEntry::new(location.to_string(), metadata).unwrap()
1135 }
1136
1137 fn entry_seq(seq: u64, location: &str, metadata: Vec<Metadata>) -> QueueEntry {
1138 QueueEntry {
1139 sequence: seq,
1140 location: location.to_string(),
1141 metadata,
1142 }
1143 }
1144
1145 fn meta(start_index: u32, time_ms: i64, data: &str) -> Metadata {
1146 Metadata {
1147 start_index,
1148 ingestion_time_ms: time_ms,
1149 payload: Bytes::from(data.to_string()),
1150 }
1151 }
1152
1153 fn collect_locations(manifest: &Manifest) -> Vec<String> {
1154 manifest.iter().map(|e| e.unwrap().location).collect()
1155 }
1156
1157 #[test]
1158 fn should_create_empty_manifest() {
1159 let m = Manifest::empty();
1160
1161 assert_eq!(m.entries_count(), 0);
1162 assert!(m.is_empty());
1163 assert_eq!(m.epoch, 0);
1164
1165 let bytes = m.to_bytes().unwrap();
1166 assert_eq!(bytes.len(), FOOTER_SIZE);
1167 assert_eq!(u32::from_le_bytes(bytes[0..4].try_into().unwrap()), 0);
1168 assert_eq!(u64::from_le_bytes(bytes[4..12].try_into().unwrap()), 0);
1169 assert_eq!(u64::from_le_bytes(bytes[12..20].try_into().unwrap()), 0);
1170 assert_eq!(
1171 u16::from_le_bytes(bytes[20..22].try_into().unwrap()),
1172 MANIFEST_VERSION
1173 );
1174 }
1175
1176 #[test]
1177 fn should_parse_valid_manifest_bytes() {
1178 let entries = vec![
1179 entry_seq(0, "a", vec![meta(0, 1, "x")]),
1180 entry_seq(1, "b", vec![meta(0, 2, "y")]),
1181 ];
1182 let data = Manifest::from_entries(&entries).to_bytes().unwrap();
1183
1184 let m = Manifest::from_bytes(data).unwrap();
1185
1186 assert_eq!(m.entries_count(), 2);
1187 }
1188
1189 #[test]
1190 fn should_parse_footer_only_bytes() {
1191 let mut buf = BytesMut::with_capacity(FOOTER_SIZE);
1192 buf.put_u32_le(0);
1193 buf.put_u64_le(42);
1194 buf.put_u64_le(0);
1195 buf.put_u16_le(MANIFEST_VERSION);
1196
1197 let m = Manifest::from_bytes(buf.freeze()).unwrap();
1198
1199 assert_eq!(m.entries_count(), 0);
1200 assert_eq!(m.epoch, 0);
1201
1202 let mut m = m;
1203 m.append(&entry("loc", vec![])).unwrap();
1204 let entries: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1205 assert_eq!(entries[0].sequence, 42);
1206 }
1207
1208 #[test]
1209 fn should_reject_empty_bytes() {
1210 let err = Manifest::from_bytes(Bytes::new()).unwrap_err();
1211
1212 assert!(err.to_string().contains("must not be empty"));
1213 }
1214
1215 #[test]
1216 fn should_reject_bytes_too_short_for_footer() {
1217 let err = Manifest::from_bytes(Bytes::from_static(&[0; 21])).unwrap_err();
1218
1219 assert!(err.to_string().contains("too short for footer"));
1220 }
1221
1222 #[test]
1223 fn should_reject_wrong_version() {
1224 let mut buf = BytesMut::with_capacity(FOOTER_SIZE);
1225 buf.put_u32_le(0);
1226 buf.put_u64_le(0);
1227 buf.put_u64_le(0);
1228 buf.put_u16_le(99);
1229
1230 let err = Manifest::from_bytes(buf.freeze()).unwrap_err();
1231
1232 assert!(err.to_string().contains("unsupported"));
1233 assert!(err.to_string().contains("99"));
1234 }
1235
1236 #[test]
1237 fn should_reject_version_zero() {
1238 let mut buf = BytesMut::with_capacity(FOOTER_SIZE);
1239 buf.put_u32_le(0);
1240 buf.put_u64_le(0);
1241 buf.put_u64_le(0);
1242 buf.put_u16_le(0);
1243
1244 let err = Manifest::from_bytes(buf.freeze()).unwrap_err();
1245
1246 assert!(err.to_string().contains("unsupported"));
1247 }
1248
1249 #[test]
1250 fn should_make_appended_entry_accessible_via_iter() {
1251 let mut m = Manifest::empty();
1252
1253 m.append(&entry("loc", vec![meta(0, 42, "meta")])).unwrap();
1254
1255 let entries: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1256 assert_eq!(entries.len(), 1);
1257 assert_eq!(entries[0].sequence, 0);
1258 assert_eq!(entries[0].location, "loc");
1259 assert_eq!(entries[0].metadata, vec![meta(0, 42, "meta")]);
1260 }
1261
1262 #[test]
1263 fn should_append_to_existing_base_entries() {
1264 let base = Manifest::from_entries(&[entry_seq(0, "base", vec![])]);
1265 let data = base.to_bytes().unwrap();
1266 let mut m = Manifest::from_bytes(data).unwrap();
1267
1268 m.append(&entry("appended", vec![])).unwrap();
1269
1270 assert_eq!(m.entries_count(), 2);
1271 assert_eq!(collect_locations(&m), vec!["base", "appended"]);
1272 let entries: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1273 assert_eq!(entries[0].sequence, 0);
1274 assert_eq!(entries[1].sequence, 1);
1275 }
1276
1277 #[test]
1278 fn should_preserve_append_order() {
1279 let mut m = Manifest::empty();
1280
1281 m.append(&entry("a", vec![])).unwrap();
1282 m.append(&entry("b", vec![])).unwrap();
1283 m.append(&entry("c", vec![])).unwrap();
1284
1285 assert_eq!(collect_locations(&m), vec!["a", "b", "c"]);
1286 let entries: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1287 assert_eq!(entries[0].sequence, 0);
1288 assert_eq!(entries[1].sequence, 1);
1289 assert_eq!(entries[2].sequence, 2);
1290 }
1291
1292 #[test]
1293 fn should_handle_entry_with_empty_location() {
1294 let m = Manifest::from_entries(&[entry_seq(0, "", vec![])]);
1295
1296 let decoded: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1297 assert_eq!(decoded[0].location, "");
1298 assert!(decoded[0].metadata.is_empty());
1299 }
1300
1301 #[test]
1302 fn should_handle_entry_with_large_metadata() {
1303 let big_meta = Bytes::from(vec![0xAB_u8; 1024]);
1304
1305 let m = Manifest::from_entries(&[entry_seq(
1306 0,
1307 "loc",
1308 vec![Metadata {
1309 start_index: 0,
1310 ingestion_time_ms: 1,
1311 payload: big_meta.clone(),
1312 }],
1313 )]);
1314
1315 let decoded: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1316 assert_eq!(decoded[0].metadata.len(), 1);
1317 assert_eq!(decoded[0].metadata[0].payload, big_meta);
1318 }
1319
1320 #[test]
1321 fn should_handle_negative_ingestion_time() {
1322 let m = Manifest::from_entries(&[entry_seq(0, "loc", vec![meta(0, -1000, "")])]);
1323
1324 let decoded: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1325 assert_eq!(decoded[0].metadata[0].ingestion_time_ms, -1000);
1326 }
1327
1328 #[test]
1329 fn should_return_footer_for_empty_manifest() {
1330 let m = Manifest::empty();
1331
1332 let bytes = m.to_bytes().unwrap();
1333
1334 assert_eq!(bytes.len(), FOOTER_SIZE);
1335 assert_eq!(u32::from_le_bytes(bytes[0..4].try_into().unwrap()), 0);
1336 assert_eq!(u64::from_le_bytes(bytes[4..12].try_into().unwrap()), 0);
1337 assert_eq!(u64::from_le_bytes(bytes[12..20].try_into().unwrap()), 0);
1338 assert_eq!(
1339 u16::from_le_bytes(bytes[20..22].try_into().unwrap()),
1340 MANIFEST_VERSION
1341 );
1342 }
1343
1344 #[test]
1345 fn should_merge_base_and_appended() {
1346 let base = Manifest::from_entries(&[entry_seq(0, "base", vec![])]);
1347 let mut m = Manifest::from_bytes(base.to_bytes().unwrap()).unwrap();
1348 m.append(&entry("appended", vec![])).unwrap();
1349
1350 let serialized = m.to_bytes().unwrap();
1351 let reparsed = Manifest::from_bytes(serialized).unwrap();
1352
1353 assert_eq!(reparsed.entries_count(), 2);
1354 assert_eq!(collect_locations(&reparsed), vec!["base", "appended"]);
1355 let entries: Vec<QueueEntry> = reparsed.iter().map(|e| e.unwrap()).collect();
1356 assert_eq!(entries[0].sequence, 0);
1357 assert_eq!(entries[1].sequence, 1);
1358 }
1359
1360 #[test]
1361 fn should_write_correct_footer_count() {
1362 let base = Manifest::from_entries(&[entry_seq(0, "a", vec![]), entry_seq(1, "b", vec![])]);
1363 let mut m = Manifest::from_bytes(base.to_bytes().unwrap()).unwrap();
1364 m.append(&entry("c", vec![])).unwrap();
1365 m.append(&entry("d", vec![])).unwrap();
1366 m.append(&entry("e", vec![])).unwrap();
1367
1368 let bytes = m.to_bytes().unwrap();
1369
1370 let footer_start = bytes.len() - FOOTER_SIZE;
1371 let count = u32::from_le_bytes(bytes[footer_start..footer_start + 4].try_into().unwrap());
1372 let next_seq = u64::from_le_bytes(
1373 bytes[footer_start + 4..footer_start + 12]
1374 .try_into()
1375 .unwrap(),
1376 );
1377 let epoch = u64::from_le_bytes(
1378 bytes[footer_start + 12..footer_start + 20]
1379 .try_into()
1380 .unwrap(),
1381 );
1382 let version = u16::from_le_bytes(bytes[footer_start + 20..].try_into().unwrap());
1383 assert_eq!(count, 5);
1384 assert_eq!(next_seq, 5);
1385 assert_eq!(epoch, 0);
1386 assert_eq!(version, MANIFEST_VERSION);
1387 }
1388
1389 #[test]
1390 fn should_round_trip_from_entries_to_bytes_from_bytes() {
1391 let entries = vec![
1392 entry_seq(0, "a", vec![meta(0, 10, "m1")]),
1393 entry_seq(1, "b", vec![meta(0, 20, "m2")]),
1394 ];
1395 let original = Manifest::from_entries(&entries);
1396
1397 let reparsed = Manifest::from_bytes(original.to_bytes().unwrap()).unwrap();
1398
1399 assert_eq!(reparsed.entries_count(), 2);
1400 let decoded: Vec<QueueEntry> = reparsed.iter().map(|e| e.unwrap()).collect();
1401 assert_eq!(decoded[0].sequence, 0);
1402 assert_eq!(decoded[0].location, "a");
1403 assert_eq!(decoded[0].metadata, vec![meta(0, 10, "m1")]);
1404 assert_eq!(decoded[1].sequence, 1);
1405 assert_eq!(decoded[1].location, "b");
1406 assert_eq!(decoded[1].metadata, vec![meta(0, 20, "m2")]);
1407 }
1408
1409 #[test]
1410 fn should_round_trip_append_serialize_reparse() {
1411 let mut m = Manifest::empty();
1412 m.append(&entry("x", vec![meta(0, 100, "data")])).unwrap();
1413 m.append(&entry("y", vec![meta(0, 200, "more")])).unwrap();
1414
1415 let reparsed = Manifest::from_bytes(m.to_bytes().unwrap()).unwrap();
1416
1417 assert_eq!(reparsed.entries_count(), 2);
1418 assert_eq!(collect_locations(&reparsed), vec!["x", "y"]);
1419 }
1420
1421 #[test]
1422 fn should_chain_serialize_reparse_append() {
1423 let original = Manifest::from_entries(&[entry_seq(0, "a", vec![])]);
1424 let mut m = Manifest::from_bytes(original.to_bytes().unwrap()).unwrap();
1425 m.append(&entry("b", vec![])).unwrap();
1426
1427 let mut m2 = Manifest::from_bytes(m.to_bytes().unwrap()).unwrap();
1428 m2.append(&entry("c", vec![])).unwrap();
1429
1430 let final_m = Manifest::from_bytes(m2.to_bytes().unwrap()).unwrap();
1431
1432 assert_eq!(final_m.entries_count(), 3);
1433 assert_eq!(collect_locations(&final_m), vec!["a", "b", "c"]);
1434 let entries: Vec<QueueEntry> = final_m.iter().map(|e| e.unwrap()).collect();
1435 assert_eq!(entries[2].sequence, 2);
1436 }
1437
1438 #[test]
1439 fn should_dequeue_entries_through_sequence() {
1440 let mut m = Manifest::empty();
1441 for _ in 0..5 {
1442 m.append(&entry("loc", vec![])).unwrap();
1443 }
1444
1445 let removed = m.dequeue(2).unwrap();
1446
1447 assert_eq!(removed.len(), 3);
1448 assert_eq!(removed[0].sequence, 0);
1449 assert_eq!(removed[1].sequence, 1);
1450 assert_eq!(removed[2].sequence, 2);
1451 assert_eq!(m.entries_count(), 2);
1452 let remaining: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1453 assert_eq!(remaining[0].sequence, 3);
1454 assert_eq!(remaining[1].sequence, 4);
1455 assert_eq!(m.next_sequence, 5);
1456 }
1457
1458 #[test]
1459 fn should_dequeue_all_entries() {
1460 let mut m = Manifest::empty();
1461 for _ in 0..3 {
1462 m.append(&entry("loc", vec![])).unwrap();
1463 }
1464
1465 let removed = m.dequeue(2).unwrap();
1466
1467 assert_eq!(removed.len(), 3);
1468 assert!(m.is_empty());
1469 assert_eq!(m.next_sequence, 3);
1470 }
1471
1472 #[test]
1473 fn should_dequeue_nothing_when_sequence_below_first() {
1474 let entries = vec![
1475 entry_seq(5, "a", vec![]),
1476 entry_seq(6, "b", vec![]),
1477 entry_seq(7, "c", vec![]),
1478 ];
1479 let mut m = Manifest::from_entries(&entries);
1480
1481 let removed = m.dequeue(3).unwrap();
1482
1483 assert!(removed.is_empty());
1484 assert_eq!(m.entries_count(), 3);
1485 }
1486
1487 #[test]
1488 fn should_append_after_dequeue() {
1489 let mut m = Manifest::empty();
1490 for _ in 0..3 {
1491 m.append(&entry("loc", vec![])).unwrap();
1492 }
1493
1494 m.dequeue(0).unwrap();
1495
1496 assert_eq!(m.entries_count(), 2);
1497 let remaining: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1498 assert_eq!(remaining[0].sequence, 1);
1499 assert_eq!(remaining[1].sequence, 2);
1500
1501 m.append(&entry("new", vec![])).unwrap();
1502 let all: Vec<QueueEntry> = m.iter().map(|e| e.unwrap()).collect();
1503 assert_eq!(all.len(), 3);
1504 assert_eq!(all[2].sequence, 3);
1505 }
1506
1507 fn encode_entry_bytes(entry: &QueueEntry) -> Vec<u8> {
1509 let mut buf = BytesMut::new();
1510 Manifest::encode_entry(&mut buf, entry).unwrap();
1511 buf.to_vec()
1512 }
1513
1514 fn manifest_from_raw_entry(entry_bytes: &[u8]) -> Manifest {
1517 let mut buf = BytesMut::new();
1518 buf.extend_from_slice(entry_bytes);
1519 buf.put_u32_le(1); buf.put_u64_le(1); buf.put_u64_le(0); buf.put_u16_le(MANIFEST_VERSION);
1523 Manifest::from_bytes(buf.freeze()).unwrap()
1524 }
1525
1526 const METADATA_COUNT_OFFSET: usize = ENTRY_LEN_SIZE + SEQUENCE_SIZE + LOCATION_LEN_SIZE + 1;
1529
1530 fn corrupt_metadata_entry(count: u32, extra_bytes: &[u8]) -> Vec<u8> {
1534 let e = QueueEntry {
1535 sequence: 1,
1536 location: "a".to_string(),
1537 metadata: vec![],
1538 };
1539 let mut raw = encode_entry_bytes(&e);
1540 raw[METADATA_COUNT_OFFSET..METADATA_COUNT_OFFSET + 4].copy_from_slice(&count.to_le_bytes());
1542 raw.extend_from_slice(extra_bytes);
1544 let new_entry_len = (raw.len() - ENTRY_LEN_SIZE) as u32;
1546 raw[..ENTRY_LEN_SIZE].copy_from_slice(&new_entry_len.to_le_bytes());
1547 raw
1548 }
1549
1550 #[test]
1551 fn should_reject_trailing_bytes_before_footer() {
1552 let mut raw = encode_entry_bytes(&entry_seq(0, "loc", vec![]));
1554 raw.extend_from_slice(&[0xFFu8; 5]); let manifest = manifest_from_raw_entry(&raw);
1557 let items: Vec<Result<QueueEntry>> = manifest.iter().collect();
1558 assert_eq!(items.len(), 2);
1559 assert!(items[0].is_ok());
1560 let err = items[1].as_ref().unwrap_err();
1561 assert!(
1562 err.to_string().contains("did not consume all bytes"),
1563 "got: {}",
1564 err
1565 );
1566 }
1567
1568 #[test]
1569 fn should_reject_entry_with_entry_len_below_minimum() {
1570 let bad_entry_len = (SEQUENCE_SIZE + LOCATION_LEN_SIZE + METADATA_COUNT_SIZE - 1) as u32;
1572 let mut raw = Vec::new();
1573 raw.extend_from_slice(&bad_entry_len.to_le_bytes());
1574 raw.extend_from_slice(&[0u8; 13]); let manifest = manifest_from_raw_entry(&raw);
1577 let err = manifest.iter().next().unwrap().unwrap_err();
1578 assert!(err.to_string().contains(
1579 "entry length 13 is less than minimum entry length 14 for the length of the location 0"
1580 ));
1581 }
1582
1583 #[test]
1584 fn should_reject_entry_with_entry_len_below_minimum_for_location() {
1585 let location = "abc";
1586 let bad_entry_len =
1588 (SEQUENCE_SIZE + LOCATION_LEN_SIZE + METADATA_COUNT_SIZE + location.len() - 1) as u32;
1589 let mut raw = Vec::new();
1590 raw.extend_from_slice(&bad_entry_len.to_le_bytes());
1591 raw.extend_from_slice(&0u64.to_le_bytes()); raw.extend_from_slice(&(location.len() as u16).to_le_bytes()); raw.extend_from_slice(&[0u8; 20]); let manifest = manifest_from_raw_entry(&raw);
1596 let err = manifest.iter().next().unwrap().unwrap_err();
1597 assert!(
1598 err.to_string()
1599 .contains("entry length 16 is less than minimum entry length 17"),
1600 "got: {}",
1601 err
1602 );
1603 }
1604
1605 #[test]
1606 fn should_reject_entry_truncated_before_entry_len() {
1607 let manifest = manifest_from_raw_entry(&[0u8; 2]);
1609 let err = manifest.iter().next().unwrap().unwrap_err();
1610 assert!(
1611 matches!(&err, Error::Serialization(msg) if msg.contains("entry length field does not fit")),
1612 "unexpected error: {err}"
1613 );
1614 }
1615
1616 #[test]
1617 fn should_reject_entry_truncated_before_metadata_start_index() {
1618 let manifest = manifest_from_raw_entry(&corrupt_metadata_entry(1, &[]));
1620 let err = manifest.iter().next().unwrap().unwrap_err();
1621 assert!(
1622 matches!(&err, Error::Serialization(msg) if msg.contains("start index field does not fit")),
1623 "unexpected error: {err}"
1624 );
1625 }
1626
1627 #[test]
1628 fn should_reject_entry_truncated_before_metadata_ingestion_time() {
1629 let manifest = manifest_from_raw_entry(&corrupt_metadata_entry(1, &0u32.to_le_bytes()));
1631 let err = manifest.iter().next().unwrap().unwrap_err();
1632 assert!(
1633 matches!(&err, Error::Serialization(msg) if msg.contains("ingestion time field does not fit")),
1634 "unexpected error: {err}"
1635 );
1636 }
1637
1638 #[test]
1639 fn should_reject_entry_truncated_before_metadata_length() {
1640 let mut extra = Vec::new();
1642 extra.extend_from_slice(&0u32.to_le_bytes()); extra.extend_from_slice(&0i64.to_le_bytes()); let manifest = manifest_from_raw_entry(&corrupt_metadata_entry(1, &extra));
1645 let err = manifest.iter().next().unwrap().unwrap_err();
1646 assert!(
1647 matches!(&err, Error::Serialization(msg) if msg.contains("metadata length field does not fit")),
1648 "unexpected error: {err}"
1649 );
1650 }
1651
1652 #[test]
1653 fn should_reject_entry_truncated_before_metadata_payload() {
1654 let mut extra = Vec::new();
1656 extra.extend_from_slice(&0u32.to_le_bytes()); extra.extend_from_slice(&0i64.to_le_bytes()); extra.extend_from_slice(&10u32.to_le_bytes()); extra.extend_from_slice(&[0xAB, 0xCD]); let manifest = manifest_from_raw_entry(&corrupt_metadata_entry(1, &extra));
1661 let err = manifest.iter().next().unwrap().unwrap_err();
1662 assert!(
1663 matches!(&err, Error::Serialization(msg) if msg.contains("metadata has less bytes than set")),
1664 "unexpected error: {err}"
1665 );
1666 }
1667
1668 #[tokio::test]
1669 async fn should_reject_location_exceeding_u16_max() {
1670 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1671 let producer =
1672 QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
1673
1674 let long_location = "x".repeat(u16::MAX as usize + 1);
1675 let result = producer.enqueue(long_location, vec![]).await;
1676 assert!(matches!(result, Err(Error::InvalidInput(msg)) if msg.contains("location length")));
1677 }
1678}