danube_core/
storage.rs

1use crate::message::StreamMessage;
2use async_trait::async_trait;
3use futures_core::Stream;
4use serde::{Deserialize, Serialize};
5use std::pin::Pin;
6use thiserror::Error;
7
8/// Start position for a subscription/reader.
9#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
10pub enum StartPosition {
11    /// Start from the current tail (latest offset + 1)
12    Latest,
13    /// Start from a concrete offset (inclusive)
14    Offset(u64),
15}
16
17#[derive(Debug, Error)]
18pub enum PersistentStorageError {
19    #[error("I/O error: {0}")]
20    Io(String),
21
22    #[error("WAL error: {0}")]
23    Wal(String),
24
25    #[error("Cloud storage error: {0}")]
26    Cloud(String),
27
28    #[error("Metadata store error: {0}")]
29    Metadata(String),
30
31    #[error("Unsupported: {0}")]
32    Unsupported(String),
33
34    #[error("Other: {0}")]
35    Other(String),
36}
37
38/// A boxed async stream of messages for a topic.
39pub type TopicStream =
40    Pin<Box<dyn Stream<Item = Result<StreamMessage, PersistentStorageError>> + Send>>;
41
42/// A persistent storage interface for a topic.
43#[async_trait]
44pub trait PersistentStorage: Send + Sync + std::fmt::Debug + 'static {
45    /// Append a message to a topic and return the assigned offset.
46    async fn append_message(
47        &self,
48        topic_name: &str,
49        msg: StreamMessage,
50    ) -> Result<u64, PersistentStorageError>;
51
52    /// Create a streaming reader starting from the provided position.
53    async fn create_reader(
54        &self,
55        topic_name: &str,
56        start: StartPosition,
57    ) -> Result<TopicStream, PersistentStorageError>;
58
59    /// Acknowledge internal checkpoints (used by background uploader/compactor).
60    async fn ack_checkpoint(
61        &self,
62        topic_name: &str,
63        up_to_offset: u64,
64    ) -> Result<(), PersistentStorageError>;
65
66    /// Optionally force a flush of buffered data.
67    async fn flush(&self, topic_name: &str) -> Result<(), PersistentStorageError>;
68}