sierradb_client/commands.rs
1use redis::{Client, ConnectionLike, RedisResult, ToRedisArgs, cmd};
2use uuid::Uuid;
3
4use crate::HelloResp;
5use crate::options::{EAppendOptions, EMAppendEvent};
6use crate::subscription::SubscriptionManager;
7use crate::types::{AppendInfo, Event, EventBatch, MultiAppendInfo, RangeValue, SubscriptionInfo};
8
9implement_commands! {
10 'a
11
12 /// Append an event to a stream.
13 ///
14 /// # Parameters
15 /// - `stream_id`: Stream identifier to append the event to
16 /// - `event_name`: Name/type of the event
17 /// - `options`: Configuration for optional parameters (event_id, partition_key, expected_version, payload, metadata)
18 ///
19 /// # Example
20 /// ```ignore
21 /// let options = EAppendOptions::new()
22 /// .payload(br#"{"name":"john"}"#)
23 /// .metadata(br#"{"source":"api"}"#);
24 /// conn.eappend("my-stream", "UserCreated", options)?;
25 /// ```
26 ///
27 /// # Returns
28 /// Returns an `AppendInfo` containing event metadata.
29 fn eappend<S: ToRedisArgs, E: ToRedisArgs>(stream_id: S, event_name: E, options: EAppendOptions<'a>) -> (AppendInfo) {
30 cmd("EAPPEND").arg(stream_id).arg(event_name).arg(options)
31 }
32
33 /// Append multiple events to streams in a single transaction.
34 ///
35 /// # Parameters
36 /// - `partition_key`: UUID that determines which partition all events will be written to
37 /// - `events`: Array of events to append, each with their own configuration
38 ///
39 /// # Example
40 /// ```ignore
41 /// let events = [
42 /// EMAppendEvent::new("stream1", "EventA").payload(br#"{"data":"value1"}"#),
43 /// EMAppendEvent::new("stream2", "EventB").payload(br#"{"data":"value2"}"#),
44 /// ];
45 /// conn.emappend(partition_key, &events)?;
46 /// ```
47 ///
48 /// **Note:** All events are appended atomically in a single transaction.
49 ///
50 /// # Returns
51 /// Returns transaction result information.
52 fn emappend<>(partition_key: Uuid, events: &'a [EMAppendEvent<'a>]) -> (MultiAppendInfo) {
53 cmd("EMAPPEND").arg(partition_key.to_string()).arg(events)
54 }
55
56 /// Get an event by its unique identifier.
57 ///
58 /// # Parameters
59 /// - `event_id`: UUID of the event to retrieve
60 ///
61 /// # Example
62 /// ```ignore
63 /// let event_id = Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000")?;
64 /// let event = conn.eget(event_id)?;
65 /// ```
66 ///
67 /// # Returns
68 /// Returns `Some(Event)` if found, `None` if the event doesn't exist.
69 fn eget<>(event_id: Uuid) -> (Option<Event>) {
70 cmd("EGET").arg(event_id.to_string())
71 }
72
73 /// Scan events in a partition by partition key.
74 ///
75 /// # Parameters
76 /// - `partition_key`: UUID key to determine which partition to scan
77 /// - `start_sequence`: Starting sequence number
78 /// - `end_sequence`: Ending sequence number (`None` means scan to end)
79 /// - `count`: Maximum number of events to return (defaults to 100)
80 ///
81 /// # Example
82 /// ```ignore
83 /// let partition_key = Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000")?;
84 /// let batch = conn.epscan_by_key(partition_key, 0, Some(100), Some(50))?;
85 /// ```
86 ///
87 /// # Returns
88 /// Returns an `EventBatch` containing events and pagination info.
89 fn epscan_by_key<>(partition_key: Uuid, start_sequence: u64, end_sequence: Option<u64>, count: Option<u64>) -> (EventBatch) {
90 cmd("EPSCAN").arg(partition_key.to_string()).arg(start_sequence).arg(match end_sequence {
91 Some(seq) => RangeValue::Value(seq),
92 None => RangeValue::End,
93 }).arg("COUNT").arg(count.unwrap_or(100))
94 }
95
96 /// Scan events in a partition by partition ID.
97 ///
98 /// # Parameters
99 /// - `partition_id`: Numeric partition identifier (0-65535)
100 /// - `start_sequence`: Starting sequence number
101 /// - `end_sequence`: Ending sequence number (`None` means scan to end)
102 /// - `count`: Maximum number of events to return (defaults to 100)
103 ///
104 /// # Example
105 /// ```ignore
106 /// let batch = conn.epscan_by_id(42, 100, Some(200), Some(50))?;
107 /// ```
108 ///
109 /// # Returns
110 /// Returns an `EventBatch` containing events and pagination info.
111 fn epscan_by_id<>(partition_id: u16, start_sequence: u64, end_sequence: Option<u64>, count: Option<u64>) -> (EventBatch) {
112 cmd("EPSCAN").arg(partition_id).arg(start_sequence).arg(match end_sequence {
113 Some(seq) => RangeValue::Value(seq),
114 None => RangeValue::End,
115 }).arg("COUNT").arg(count.unwrap_or(100))
116 }
117
118 /// Scan events in a stream by stream ID.
119 ///
120 /// # Parameters
121 /// - `stream_id`: Stream identifier to scan
122 /// - `start_version`: Starting version number
123 /// - `end_version`: Ending version number (`None` means scan to end)
124 /// - `count`: Maximum number of events to return (defaults to 100)
125 ///
126 /// # Example
127 /// ```ignore
128 /// let events = conn.escan("my-stream", 0, Some(100), Some(50))?;
129 /// ```
130 ///
131 /// # Returns
132 /// Returns events from the stream across all partitions.
133 fn escan<>(stream_id: &'a str, start_version: u64, end_version: Option<u64>, count: Option<u64>) -> (EventBatch) {
134 cmd("ESCAN").arg(stream_id).arg(start_version).arg(match end_version {
135 Some(seq) => RangeValue::Value(seq),
136 None => RangeValue::End,
137 }).arg("COUNT").arg(count.unwrap_or(100))
138 }
139
140 /// Scan events in a stream by stream ID within a specific partition.
141 ///
142 /// # Parameters
143 /// - `stream_id`: Stream identifier to scan
144 /// - `partition_key`: UUID to scan within a specific partition only
145 /// - `start_version`: Starting version number
146 /// - `end_version`: Ending version number (`None` means scan to end)
147 /// - `count`: Maximum number of events to return (defaults to 100)
148 ///
149 /// # Example
150 /// ```ignore
151 /// let partition_key = Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000")?;
152 /// let batch = conn.escan_with_partition_key("my-stream", partition_key, 0, Some(100), Some(50))?;
153 /// ```
154 ///
155 /// # Returns
156 /// Returns an `EventBatch` containing events from the specified partition only.
157 fn escan_with_partition_key<>(stream_id: &'a str, partition_key: Uuid, start_version: u64, end_version: Option<u64>, count: Option<u64>) -> (EventBatch) {
158 cmd("ESCAN").arg(stream_id).arg(start_version).arg(match end_version {
159 Some(seq) => RangeValue::Value(seq),
160 None => RangeValue::End,
161 }).arg("COUNT").arg(count.unwrap_or(100)).arg("PARTITION_KEY").arg(partition_key.to_string())
162 }
163
164 /// Get the current sequence number for a partition by partition key.
165 ///
166 /// # Parameters
167 /// - `partition_key`: UUID key to determine which partition to query
168 ///
169 /// # Example
170 /// ```ignore
171 /// let partition_key = Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000")?;
172 /// let sequence = conn.epseq_by_key(partition_key)?;
173 /// ```
174 ///
175 /// # Returns
176 /// Returns the current sequence number for the partition.
177 fn epseq_by_key<>(partition_key: Uuid) -> (Option<u64>) {
178 cmd("EPSEQ").arg(partition_key.to_string())
179 }
180
181 /// Get the current sequence number for a partition by partition ID.
182 ///
183 /// # Parameters
184 /// - `partition_id`: Numeric partition identifier (0-65535)
185 ///
186 /// # Example
187 /// ```ignore
188 /// let sequence = conn.epseq_by_id(42)?;
189 /// ```
190 ///
191 /// # Returns
192 /// Returns the current sequence number for the partition.
193 fn epseq_by_id<>(partition_id: u16) -> (Option<u64>) {
194 cmd("EPSEQ").arg(partition_id)
195 }
196
197 /// Get the current version number for a stream.
198 ///
199 /// # Parameters
200 /// - `stream_id`: Stream identifier to get version for
201 ///
202 /// # Example
203 /// ```ignore
204 /// let version = conn.esver("my-stream")?;
205 /// ```
206 ///
207 /// # Returns
208 /// Returns the current version number for the stream.
209 fn esver<>(stream_id: &'a str) -> (Option<u64>) {
210 cmd("ESVER").arg(stream_id)
211 }
212
213 /// Get the current version number for a stream within a specific partition.
214 ///
215 /// # Parameters
216 /// - `stream_id`: Stream identifier to get version for
217 /// - `partition_key`: UUID to check specific partition
218 ///
219 /// # Example
220 /// ```ignore
221 /// let partition_key = Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000")?;
222 /// let version = conn.esver_with_partition_key("my-stream", partition_key)?;
223 /// ```
224 ///
225 /// # Returns
226 /// Returns the current version number for the stream in the specified partition.
227 fn esver_with_partition_key<>(stream_id: &'a str, partition_key: Uuid) -> (Option<u64>) {
228 cmd("ESVER").arg(stream_id).arg("PARTITION_KEY").arg(partition_key.to_string())
229 }
230
231 /// Subscribe to events from a stream.
232 ///
233 /// # Parameters
234 /// - `stream_id`: Stream identifier to subscribe to
235 ///
236 /// # Example
237 /// ```ignore
238 /// let subscription = conn.esub("my-stream")?;
239 /// ```
240 ///
241 /// # Returns
242 /// Returns subscription information.
243 ///
244 /// **Note:** Establishes a persistent connection to receive real-time stream events.
245 fn esub<S: ToRedisArgs>(stream_id: S) -> (SubscriptionInfo) {
246 cmd("ESUB").arg(stream_id)
247 }
248
249 /// Subscribe to events from a stream with partition key.
250 ///
251 /// # Parameters
252 /// - `stream_id`: Stream identifier to subscribe to
253 /// - `partition_key`: UUID to subscribe to specific partition
254 ///
255 /// # Example
256 /// ```ignore
257 /// let partition_key = Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000")?;
258 /// let subscription = conn.esub_with_partition_key("my-stream", partition_key)?;
259 /// ```
260 ///
261 /// # Returns
262 /// Returns subscription information.
263 fn esub_with_partition_key<S: ToRedisArgs>(stream_id: S, partition_key: Uuid) -> (SubscriptionInfo) {
264 cmd("ESUB").arg(stream_id).arg("PARTITION_KEY").arg(partition_key.to_string())
265 }
266
267 /// Subscribe to events from a stream starting from a specific version.
268 ///
269 /// # Parameters
270 /// - `stream_id`: Stream identifier to subscribe to
271 /// - `from_version`: Start streaming from this version number
272 ///
273 /// # Example
274 /// ```ignore
275 /// let subscription = conn.esub_from_version("my-stream", 100)?;
276 /// ```
277 ///
278 /// # Returns
279 /// Returns subscription information.
280 fn esub_from_version<S: ToRedisArgs>(stream_id: S, from_version: u64) -> (SubscriptionInfo) {
281 cmd("ESUB").arg(stream_id).arg("FROM").arg(from_version)
282 }
283
284 /// Subscribe to events from a stream with partition key and version.
285 ///
286 /// # Parameters
287 /// - `stream_id`: Stream identifier to subscribe to
288 /// - `partition_key`: UUID to subscribe to specific partition
289 /// - `from_version`: Starting version number
290 ///
291 /// # Example
292 /// ```ignore
293 /// let partition_key = Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000")?;
294 /// let subscription = conn.esub_with_partition_and_version("my-stream", partition_key, 100)?;
295 /// ```
296 ///
297 /// # Returns
298 /// Returns subscription information.
299 fn esub_with_partition_and_version<S: ToRedisArgs>(stream_id: S, partition_key: Uuid, from_version: u64) -> (SubscriptionInfo) {
300 cmd("ESUB").arg(stream_id).arg("PARTITION_KEY").arg(partition_key.to_string()).arg("FROM").arg(from_version)
301 }
302
303 /// Subscribe to events from a partition by partition key.
304 ///
305 /// # Parameters
306 /// - `partition_key`: UUID key to determine which partition to subscribe to
307 ///
308 /// # Example
309 /// ```ignore
310 /// let partition_key = Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000")?;
311 /// let subscription = conn.epsub_by_key(partition_key)?;
312 /// ```
313 ///
314 /// # Returns
315 /// Returns subscription information.
316 fn epsub_by_key<>(partition_key: Uuid) -> (SubscriptionInfo) {
317 cmd("EPSUB").arg(partition_key.to_string())
318 }
319
320 /// Subscribe to events from a partition by partition ID.
321 ///
322 /// # Parameters
323 /// - `partition_id`: Numeric partition identifier (0-65535)
324 ///
325 /// # Example
326 /// ```ignore
327 /// let subscription = conn.epsub_by_id(42)?;
328 /// ```
329 ///
330 /// # Returns
331 /// Returns subscription information.
332 fn epsub_by_id<>(partition_id: u16) -> (SubscriptionInfo) {
333 cmd("EPSUB").arg(partition_id)
334 }
335
336 /// Subscribe to events from a partition by partition key, starting from a specific sequence.
337 ///
338 /// # Parameters
339 /// - `partition_key`: UUID key to determine which partition to subscribe to
340 /// - `from_sequence`: Start streaming from this sequence number
341 ///
342 /// # Example
343 /// ```ignore
344 /// let partition_key = Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000")?;
345 /// let subscription = conn.epsub_by_key_from_sequence(partition_key, 1000)?;
346 /// ```
347 ///
348 /// # Returns
349 /// Returns subscription information.
350 fn epsub_by_key_from_sequence<>(partition_key: Uuid, from_sequence: u64) -> (SubscriptionInfo) {
351 cmd("EPSUB").arg(partition_key.to_string()).arg("FROM").arg(from_sequence)
352 }
353
354 /// Subscribe to events from a partition by partition ID, starting from a specific sequence.
355 ///
356 /// # Parameters
357 /// - `partition_id`: Numeric partition identifier (0-65535)
358 /// - `from_sequence`: Start streaming from this sequence number
359 ///
360 /// # Example
361 /// ```ignore
362 /// let subscription = conn.epsub_by_id_from_sequence(42, 1000)?;
363 /// ```
364 ///
365 /// # Returns
366 /// Returns subscription information.
367 fn epsub_by_id_from_sequence<>(partition_id: u16, from_sequence: u64) -> (SubscriptionInfo) {
368 cmd("EPSUB").arg(partition_id).arg("FROM").arg(from_sequence)
369 }
370
371 /// Handshake with the SierraDB server.
372 ///
373 /// # Parameters
374 /// - `version`: Protocol version (must be 3)
375 ///
376 /// # Example
377 /// ```ignore
378 /// let server_info = conn.hello(3)?;
379 /// ```
380 ///
381 /// # Returns
382 /// Returns server information including name, version, peer_id, and num_partitions.
383 fn hello<>(version: u32) -> (HelloResp) {
384 cmd("HELLO").arg(version)
385 }
386
387 /// Health check command.
388 ///
389 /// # Example
390 /// ```ignore
391 /// let response = conn.ping()?;
392 /// ```
393 ///
394 /// # Returns
395 /// Returns "PONG" string.
396 fn ping<>() -> (String) {
397 &mut cmd("PING")
398 }
399
400 /// Acknowledge events up to a specific cursor for a subscription.
401 ///
402 /// # Parameters
403 /// - `subscription_id`: UUID of the subscription
404 /// - `cursor`: Cursor number to acknowledge up to
405 ///
406 /// # Example
407 /// ```ignore
408 /// let subscription_id = Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000")?;
409 /// conn.eack(subscription_id, 100)?;
410 /// ```
411 ///
412 /// # Returns
413 /// Returns "OK" on success.
414 fn eack<>(subscription_id: Uuid, cursor: u64) -> (String) {
415 cmd("EACK").arg(subscription_id.to_string()).arg(cursor)
416 }
417}
418
419/// Extension trait for Redis clients to provide SierraDB async subscription
420/// functionality.
421///
422/// This trait provides convenience methods for creating typed subscriptions
423/// using a `SubscriptionManager`.
424pub trait SierraAsyncClientExt {
425 /// Create a new subscription manager for this client.
426 fn subscription_manager(
427 &self,
428 ) -> impl std::future::Future<Output = RedisResult<SubscriptionManager>> + Send;
429}
430
431impl SierraAsyncClientExt for Client {
432 fn subscription_manager(
433 &self,
434 ) -> impl std::future::Future<Output = RedisResult<SubscriptionManager>> + Send {
435 SubscriptionManager::new(self)
436 }
437}
438
439impl<T> Commands for T where T: ConnectionLike {}
440
441impl<T> TypedCommands for T where T: ConnectionLike {}
442
443#[cfg(feature = "aio")]
444impl<T> AsyncCommands for T where T: redis::aio::ConnectionLike + Send + Sync + Sized {}
445
446#[cfg(feature = "aio")]
447impl<T> AsyncTypedCommands for T where T: redis::aio::ConnectionLike + Send + Sync + Sized {}