Skip to main content

sierradb_client/
options.rs

1use std::{
2    borrow::Cow,
3    time::{SystemTime, UNIX_EPOCH},
4};
5
6use redis::{RedisWrite, ToRedisArgs};
7use sierradb_protocol::ExpectedVersion;
8use uuid::Uuid;
9
10/// Options for the EAPPEND command
11#[derive(Clone, Default)]
12pub struct EAppendOptions<'a> {
13    event_id: Option<Uuid>,
14    partition_key: Option<Uuid>,
15    expected_version: ExpectedVersion,
16    timestamp: Option<u64>,
17    payload: Cow<'a, [u8]>,
18    metadata: Cow<'a, [u8]>,
19}
20
21impl<'a> EAppendOptions<'a> {
22    pub fn new() -> Self {
23        Self {
24            event_id: None,
25            partition_key: None,
26            expected_version: ExpectedVersion::Any,
27            timestamp: None,
28            payload: Cow::Borrowed(&[]),
29            metadata: Cow::Borrowed(&[]),
30        }
31    }
32
33    pub fn event_id(mut self, event_id: Uuid) -> Self {
34        self.event_id = Some(event_id);
35        self
36    }
37
38    pub fn partition_key(mut self, partition_key: Uuid) -> Self {
39        self.partition_key = Some(partition_key);
40        self
41    }
42
43    pub fn expected_version(mut self, expected_version: ExpectedVersion) -> Self {
44        self.expected_version = expected_version;
45        self
46    }
47
48    pub fn timestamp(mut self, timestamp: SystemTime) -> Self {
49        self.timestamp = Some(
50            timestamp
51                .duration_since(UNIX_EPOCH)
52                .unwrap()
53                .as_millis()
54                .try_into()
55                .unwrap(),
56        );
57        self
58    }
59
60    pub fn payload(mut self, payload: impl Into<Cow<'a, [u8]>>) -> Self {
61        self.payload = payload.into();
62        self
63    }
64
65    pub fn metadata(mut self, metadata: impl Into<Cow<'a, [u8]>>) -> Self {
66        self.metadata = metadata.into();
67        self
68    }
69}
70
71impl<'a> ToRedisArgs for EAppendOptions<'a> {
72    fn write_redis_args<W>(&self, out: &mut W)
73    where
74        W: ?Sized + RedisWrite,
75    {
76        if let Some(event_id) = &self.event_id {
77            out.write_arg(b"EVENT_ID");
78            out.write_arg(event_id.to_string().as_bytes());
79        }
80        if let Some(partition_key) = &self.partition_key {
81            out.write_arg(b"PARTITION_KEY");
82            out.write_arg(partition_key.to_string().as_bytes());
83        }
84        match self.expected_version {
85            ExpectedVersion::Any => {}
86            ExpectedVersion::Exists => {
87                out.write_arg(b"EXPECTED_VERSION");
88                out.write_arg(b"EXISTS");
89            }
90            ExpectedVersion::Empty => {
91                out.write_arg(b"EXPECTED_VERSION");
92                out.write_arg(b"EMPTY");
93            }
94            ExpectedVersion::Exact(version) => {
95                out.write_arg(b"EXPECTED_VERSION");
96                out.write_arg(version.to_string().as_bytes());
97            }
98        }
99        if let Some(timestamp) = self.timestamp {
100            out.write_arg(b"TIMESTAMP");
101            out.write_arg(timestamp.to_string().as_bytes());
102        }
103        if !self.payload.is_empty() {
104            out.write_arg(b"PAYLOAD");
105            out.write_arg(&self.payload);
106        }
107        if !self.metadata.is_empty() {
108            out.write_arg(b"METADATA");
109            out.write_arg(&self.metadata);
110        }
111    }
112}
113
114/// Event configuration for the EMAPPEND command
115#[derive(Clone, Debug, Default, PartialEq, Eq)]
116pub struct EMAppendEvent<'a> {
117    stream_id: Cow<'a, str>,
118    event_name: Cow<'a, str>,
119    event_id: Option<Uuid>,
120    expected_version: ExpectedVersion,
121    timestamp: Option<u64>,
122    payload: Cow<'a, [u8]>,
123    metadata: Cow<'a, [u8]>,
124}
125
126impl<'a> EMAppendEvent<'a> {
127    pub fn new(
128        stream_id: impl Into<Cow<'a, str>>,
129        event_name: impl Into<Cow<'a, str>>,
130    ) -> EMAppendEvent<'a> {
131        EMAppendEvent {
132            stream_id: stream_id.into(),
133            event_name: event_name.into(),
134            event_id: None,
135            expected_version: ExpectedVersion::Any,
136            timestamp: None,
137            payload: Cow::Borrowed(&[]),
138            metadata: Cow::Borrowed(&[]),
139        }
140    }
141
142    pub fn event_id(mut self, event_id: Uuid) -> Self {
143        self.event_id = Some(event_id);
144        self
145    }
146
147    pub fn expected_version(mut self, expected_version: ExpectedVersion) -> Self {
148        self.expected_version = expected_version;
149        self
150    }
151
152    pub fn timestamp(mut self, timestamp: SystemTime) -> Self {
153        self.timestamp = Some(
154            timestamp
155                .duration_since(UNIX_EPOCH)
156                .unwrap()
157                .as_millis()
158                .try_into()
159                .unwrap(),
160        );
161        self
162    }
163
164    pub fn payload(mut self, payload: impl Into<Cow<'a, [u8]>>) -> Self {
165        self.payload = payload.into();
166        self
167    }
168
169    pub fn metadata(mut self, metadata: impl Into<Cow<'a, [u8]>>) -> Self {
170        self.metadata = metadata.into();
171        self
172    }
173}
174
175impl<'a> ToRedisArgs for EMAppendEvent<'a> {
176    fn write_redis_args<W>(&self, out: &mut W)
177    where
178        W: ?Sized + RedisWrite,
179    {
180        out.write_arg(self.stream_id.as_bytes());
181        out.write_arg(self.event_name.as_bytes());
182        if let Some(event_id) = &self.event_id {
183            out.write_arg(b"EVENT_ID");
184            out.write_arg(event_id.to_string().as_bytes());
185        }
186        match self.expected_version {
187            ExpectedVersion::Any => {}
188            ExpectedVersion::Exists => {
189                out.write_arg(b"EXPECTED_VERSION");
190                out.write_arg(b"EXISTS");
191            }
192            ExpectedVersion::Empty => {
193                out.write_arg(b"EXPECTED_VERSION");
194                out.write_arg(b"EMPTY");
195            }
196            ExpectedVersion::Exact(version) => {
197                out.write_arg(b"EXPECTED_VERSION");
198                out.write_arg(version.to_string().as_bytes());
199            }
200        }
201        if let Some(timestamp) = self.timestamp {
202            out.write_arg(b"TIMESTAMP");
203            out.write_arg(timestamp.to_string().as_bytes());
204        }
205        if !self.payload.is_empty() {
206            out.write_arg(b"PAYLOAD");
207            out.write_arg(&self.payload);
208        }
209        if !self.metadata.is_empty() {
210            out.write_arg(b"METADATA");
211            out.write_arg(&self.metadata);
212        }
213    }
214}