streambed/
commit_log.rs

1//! [A commit log is an append-only data structure that can be used
2//! in a variety of use-cases, such as tracking sequences of events,
3//! transactions or replicated state machines](https://docs.rs/commitlog/latest/commitlog/).
4//!
5//! Commit log functionality that is modelled on [Apache Kafka's](https://kafka.apache.org/)
6//! API, and can be implemented with multiple types of backend
7//! e.g. one that uses the Kafka HTTP REST API.
8
9use std::{pin::Pin, time::Duration};
10
11use async_trait::async_trait;
12use chrono::{DateTime, Utc};
13use serde::{Deserialize, Deserializer, Serialize};
14use serde_with::{base64::Base64, serde_as};
15use smol_str::SmolStr;
16use tokio_stream::Stream;
17
18/// An offset into a commit log. Offsets are used to address
19/// records and can be relied on to have an ascending order.
20pub type Offset = u64;
21
22/// Each record in a commit log has a key. How the key is formed
23/// is an application concern. By way of an example, keys can be
24/// used to associate to an entity.
25pub type Key = u64;
26
27/// Topics can be distributed into partitions which, in turn,
28/// enable scaling.
29pub type Partition = u32;
30
31/// A topic to subscribe to or has been subscribed to. Topics
32/// may be namespaced by prefixing with characters followed by
33/// a `:`. For example, "my-ns:my-topic". In the absence of
34/// a namespace, the server will assume a default namespace.
35pub type Topic = SmolStr;
36
37/// Header key type
38pub type HeaderKey = SmolStr;
39
40/// A header provides a means of augmenting a record with
41/// meta-data.
42#[serde_as]
43#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
44pub struct Header {
45    pub key: HeaderKey,
46    #[serde_as(as = "Base64")]
47    pub value: Vec<u8>,
48}
49
50/// A declaration of an offset to be committed to a topic.
51#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
52pub struct ConsumerOffset {
53    pub topic: Topic,
54    pub partition: Partition,
55    pub offset: Offset,
56}
57
58/// A declaration of a topic to subscribe to
59#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
60pub struct Subscription {
61    pub topic: Topic,
62}
63
64/// A declaration of a consumer group session to connect with.
65/// In the case that offsets are supplied, these offsets are
66/// associated with their respective topics such that any
67/// subsequent subscription will source from the offset.
68/// In the case where subscriptions are supplied, the consumer
69/// instance will subscribe and reply with a stream of records
70/// ending only when the connection to the topic is severed.
71#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
72pub struct Consumer {
73    #[serde(deserialize_with = "nullable_vec")]
74    pub offsets: Vec<ConsumerOffset>,
75    pub subscriptions: Vec<Subscription>,
76}
77
78/// A declaration of a record produced by a subscription
79#[serde_as]
80#[derive(Clone, Deserialize, Debug, Eq, PartialEq, Serialize)]
81pub struct ConsumerRecord {
82    pub topic: Topic,
83    #[serde(deserialize_with = "nullable_vec")]
84    pub headers: Vec<Header>,
85    pub timestamp: Option<DateTime<Utc>>,
86    pub key: Key,
87    #[serde_as(as = "Base64")]
88    pub value: Vec<u8>,
89    pub partition: Partition,
90    pub offset: Offset,
91}
92
93/// The reply to an offsets request
94#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
95pub struct PartitionOffsets {
96    pub beginning_offset: Offset,
97    pub end_offset: Offset,
98}
99
100/// A declaration of a record to be produced to a topic
101#[serde_as]
102#[derive(Clone, Deserialize, Debug, Eq, PartialEq, Serialize)]
103pub struct ProducerRecord {
104    pub topic: Topic,
105    #[serde(deserialize_with = "nullable_vec")]
106    pub headers: Vec<Header>,
107    pub timestamp: Option<DateTime<Utc>>,
108    pub key: Key,
109    #[serde_as(as = "Base64")]
110    pub value: Vec<u8>,
111    pub partition: Partition,
112}
113
114/// The reply to a publish request
115#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
116pub struct ProducedOffset {
117    pub offset: Offset,
118}
119
120/// There was a problem producing a record
121#[derive(Clone, Debug, Eq, PartialEq)]
122pub enum ProducerError {
123    /// The commit log received the request but was unable to process it.
124    CannotProduce,
125}
126
127/// A commit log holds topics and can be appended to and tailed.
128/// Connections are managed and retried if they cannot be established.
129#[async_trait]
130pub trait CommitLog: Clone + Send + Sync {
131    /// Retrieve the current offsets of a topic if they are present.
132    async fn offsets(&self, topic: Topic, partition: Partition) -> Option<PartitionOffsets>;
133
134    /// Publish a record and return the offset that was assigned.
135    async fn produce(&self, record: ProducerRecord) -> Result<ProducedOffset, ProducerError>;
136
137    /// Subscribe to one or more topics for a given consumer group
138    /// having committed zero or more topics. The records are streamed
139    /// back indefinitely unless an idle timeout argument is provided.
140    /// In the case of an idle timeout, if no record is received
141    /// within that period, None is returned to end the stream.
142    fn scoped_subscribe<'a>(
143        &'a self,
144        consumer_group_name: &str,
145        offsets: Vec<ConsumerOffset>,
146        subscriptions: Vec<Subscription>,
147        idle_timeout: Option<Duration>,
148    ) -> Pin<Box<dyn Stream<Item = ConsumerRecord> + Send + 'a>>;
149}
150
151fn nullable_vec<'de, D, T>(d: D) -> Result<Vec<T>, D::Error>
152where
153    D: Deserializer<'de>,
154    T: Deserialize<'de>,
155{
156    Deserialize::deserialize(d).map(|x: Option<_>| x.unwrap_or_default())
157}
158
159#[cfg(test)]
160mod tests {
161    use super::*;
162
163    #[test]
164    fn test_nullable_vec_handles_null() {
165        let json = r#"
166        {
167            "offsets": null,
168            "subscriptions": []
169        }
170        "#;
171        assert_eq!(
172            serde_json::from_str::<Consumer>(json).unwrap(),
173            Consumer {
174                offsets: vec![],
175                subscriptions: vec![]
176            }
177        );
178    }
179
180    #[test]
181    fn test_nullable_vec_handles_empty_vec() {
182        let json = r#"
183        {
184            "offsets": [],
185            "subscriptions": []
186        }
187        "#;
188        assert_eq!(
189            serde_json::from_str::<Consumer>(json).unwrap(),
190            Consumer {
191                offsets: vec![],
192                subscriptions: vec![]
193            }
194        );
195    }
196
197    #[test]
198    fn test_nullable_vec_handles_vec() {
199        let json = r#"
200        {
201            "offsets": [{"topic": "topic", "partition": 0, "offset": 0}],
202            "subscriptions": []
203        }
204        "#;
205        assert_eq!(
206            serde_json::from_str::<Consumer>(json).unwrap(),
207            Consumer {
208                offsets: vec![ConsumerOffset {
209                    topic: Topic::from("topic"),
210                    partition: 0,
211                    offset: 0
212                }],
213                subscriptions: vec![]
214            }
215        );
216    }
217}