Skip to main content

sierradb_client/
commands.rs

1use redis::{Client, ConnectionLike, RedisResult, ToRedisArgs, cmd};
2use uuid::Uuid;
3
4use crate::HelloResp;
5use crate::options::{EAppendOptions, EMAppendEvent};
6use crate::subscription::SubscriptionManager;
7use crate::types::{AppendInfo, Event, EventBatch, MultiAppendInfo, RangeValue, SubscriptionInfo};
8
9implement_commands! {
10    'a
11
12    /// Append an event to a stream.
13    ///
14    /// # Parameters
15    /// - `stream_id`: Stream identifier to append the event to
16    /// - `event_name`: Name/type of the event
17    /// - `options`: Configuration for optional parameters (event_id, partition_key, expected_version, payload, metadata)
18    ///
19    /// # Example
20    /// ```ignore
21    /// let options = EAppendOptions::new()
22    ///     .payload(br#"{"name":"john"}"#)
23    ///     .metadata(br#"{"source":"api"}"#);
24    /// conn.eappend("my-stream", "UserCreated", options)?;
25    /// ```
26    ///
27    /// # Returns
28    /// Returns an `AppendInfo` containing event metadata.
29    fn eappend<S: ToRedisArgs, E: ToRedisArgs>(stream_id: S, event_name: E, options: EAppendOptions<'a>) -> (AppendInfo) {
30        cmd("EAPPEND").arg(stream_id).arg(event_name).arg(options)
31    }
32
33    /// Append multiple events to streams in a single transaction.
34    ///
35    /// # Parameters
36    /// - `partition_key`: UUID that determines which partition all events will be written to
37    /// - `events`: Array of events to append, each with their own configuration
38    ///
39    /// # Example
40    /// ```ignore
41    /// let events = [
42    ///     EMAppendEvent::new("stream1", "EventA").payload(br#"{"data":"value1"}"#),
43    ///     EMAppendEvent::new("stream2", "EventB").payload(br#"{"data":"value2"}"#),
44    /// ];
45    /// conn.emappend(partition_key, &events)?;
46    /// ```
47    ///
48    /// **Note:** All events are appended atomically in a single transaction.
49    ///
50    /// # Returns
51    /// Returns transaction result information.
52    fn emappend<>(partition_key: Uuid, events: &'a [EMAppendEvent<'a>]) -> (MultiAppendInfo) {
53        cmd("EMAPPEND").arg(partition_key.to_string()).arg(events)
54    }
55
56    /// Get an event by its unique identifier.
57    ///
58    /// # Parameters
59    /// - `event_id`: UUID of the event to retrieve
60    ///
61    /// # Example
62    /// ```ignore
63    /// let event_id = Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000")?;
64    /// let event = conn.eget(event_id)?;
65    /// ```
66    ///
67    /// # Returns
68    /// Returns `Some(Event)` if found, `None` if the event doesn't exist.
69    fn eget<>(event_id: Uuid) -> (Option<Event>) {
70        cmd("EGET").arg(event_id.to_string())
71    }
72
73    /// Scan events in a partition by partition key.
74    ///
75    /// # Parameters
76    /// - `partition_key`: UUID key to determine which partition to scan
77    /// - `start_sequence`: Starting sequence number
78    /// - `end_sequence`: Ending sequence number (`None` means scan to end)
79    /// - `count`: Maximum number of events to return (defaults to 100)
80    ///
81    /// # Example
82    /// ```ignore
83    /// let partition_key = Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000")?;
84    /// let batch = conn.epscan_by_key(partition_key, 0, Some(100), Some(50))?;
85    /// ```
86    ///
87    /// # Returns
88    /// Returns an `EventBatch` containing events and pagination info.
89    fn epscan_by_key<>(partition_key: Uuid, start_sequence: u64, end_sequence: Option<u64>, count: Option<u64>) -> (EventBatch) {
90        cmd("EPSCAN").arg(partition_key.to_string()).arg(start_sequence).arg(match end_sequence {
91            Some(seq) => RangeValue::Value(seq),
92            None => RangeValue::End,
93        }).arg("COUNT").arg(count.unwrap_or(100))
94    }
95
96    /// Scan events in a partition by partition ID.
97    ///
98    /// # Parameters
99    /// - `partition_id`: Numeric partition identifier (0-65535)
100    /// - `start_sequence`: Starting sequence number
101    /// - `end_sequence`: Ending sequence number (`None` means scan to end)
102    /// - `count`: Maximum number of events to return (defaults to 100)
103    ///
104    /// # Example
105    /// ```ignore
106    /// let batch = conn.epscan_by_id(42, 100, Some(200), Some(50))?;
107    /// ```
108    ///
109    /// # Returns
110    /// Returns an `EventBatch` containing events and pagination info.
111    fn epscan_by_id<>(partition_id: u16, start_sequence: u64, end_sequence: Option<u64>, count: Option<u64>) -> (EventBatch) {
112        cmd("EPSCAN").arg(partition_id).arg(start_sequence).arg(match end_sequence {
113            Some(seq) => RangeValue::Value(seq),
114            None => RangeValue::End,
115        }).arg("COUNT").arg(count.unwrap_or(100))
116    }
117
118    /// Scan events in a stream by stream ID.
119    ///
120    /// # Parameters
121    /// - `stream_id`: Stream identifier to scan
122    /// - `start_version`: Starting version number
123    /// - `end_version`: Ending version number (`None` means scan to end)
124    /// - `count`: Maximum number of events to return (defaults to 100)
125    ///
126    /// # Example
127    /// ```ignore
128    /// let events = conn.escan("my-stream", 0, Some(100), Some(50))?;
129    /// ```
130    ///
131    /// # Returns
132    /// Returns events from the stream across all partitions.
133    fn escan<>(stream_id: &'a str, start_version: u64, end_version: Option<u64>, count: Option<u64>) -> (EventBatch) {
134        cmd("ESCAN").arg(stream_id).arg(start_version).arg(match end_version {
135            Some(seq) => RangeValue::Value(seq),
136            None => RangeValue::End,
137        }).arg("COUNT").arg(count.unwrap_or(100))
138    }
139
140    /// Scan events in a stream by stream ID within a specific partition.
141    ///
142    /// # Parameters
143    /// - `stream_id`: Stream identifier to scan
144    /// - `partition_key`: UUID to scan within a specific partition only
145    /// - `start_version`: Starting version number
146    /// - `end_version`: Ending version number (`None` means scan to end)
147    /// - `count`: Maximum number of events to return (defaults to 100)
148    ///
149    /// # Example
150    /// ```ignore
151    /// let partition_key = Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000")?;
152    /// let batch = conn.escan_with_partition_key("my-stream", partition_key, 0, Some(100), Some(50))?;
153    /// ```
154    ///
155    /// # Returns
156    /// Returns an `EventBatch` containing events from the specified partition only.
157    fn escan_with_partition_key<>(stream_id: &'a str, partition_key: Uuid, start_version: u64, end_version: Option<u64>, count: Option<u64>) -> (EventBatch) {
158        cmd("ESCAN").arg(stream_id).arg(start_version).arg(match end_version {
159            Some(seq) => RangeValue::Value(seq),
160            None => RangeValue::End,
161        }).arg("COUNT").arg(count.unwrap_or(100)).arg("PARTITION_KEY").arg(partition_key.to_string())
162    }
163
164    /// Get the current sequence number for a partition by partition key.
165    ///
166    /// # Parameters
167    /// - `partition_key`: UUID key to determine which partition to query
168    ///
169    /// # Example
170    /// ```ignore
171    /// let partition_key = Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000")?;
172    /// let sequence = conn.epseq_by_key(partition_key)?;
173    /// ```
174    ///
175    /// # Returns
176    /// Returns the current sequence number for the partition.
177    fn epseq_by_key<>(partition_key: Uuid) -> (Option<u64>) {
178        cmd("EPSEQ").arg(partition_key.to_string())
179    }
180
181    /// Get the current sequence number for a partition by partition ID.
182    ///
183    /// # Parameters
184    /// - `partition_id`: Numeric partition identifier (0-65535)
185    ///
186    /// # Example
187    /// ```ignore
188    /// let sequence = conn.epseq_by_id(42)?;
189    /// ```
190    ///
191    /// # Returns
192    /// Returns the current sequence number for the partition.
193    fn epseq_by_id<>(partition_id: u16) -> (Option<u64>) {
194        cmd("EPSEQ").arg(partition_id)
195    }
196
197    /// Get the current version number for a stream.
198    ///
199    /// # Parameters
200    /// - `stream_id`: Stream identifier to get version for
201    ///
202    /// # Example
203    /// ```ignore
204    /// let version = conn.esver("my-stream")?;
205    /// ```
206    ///
207    /// # Returns
208    /// Returns the current version number for the stream.
209    fn esver<>(stream_id: &'a str) -> (Option<u64>) {
210        cmd("ESVER").arg(stream_id)
211    }
212
213    /// Get the current version number for a stream within a specific partition.
214    ///
215    /// # Parameters
216    /// - `stream_id`: Stream identifier to get version for
217    /// - `partition_key`: UUID to check specific partition
218    ///
219    /// # Example
220    /// ```ignore
221    /// let partition_key = Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000")?;
222    /// let version = conn.esver_with_partition_key("my-stream", partition_key)?;
223    /// ```
224    ///
225    /// # Returns
226    /// Returns the current version number for the stream in the specified partition.
227    fn esver_with_partition_key<>(stream_id: &'a str, partition_key: Uuid) -> (Option<u64>) {
228        cmd("ESVER").arg(stream_id).arg("PARTITION_KEY").arg(partition_key.to_string())
229    }
230
231    /// Subscribe to events from a stream.
232    ///
233    /// # Parameters
234    /// - `stream_id`: Stream identifier to subscribe to
235    ///
236    /// # Example
237    /// ```ignore
238    /// let subscription = conn.esub("my-stream")?;
239    /// ```
240    ///
241    /// # Returns
242    /// Returns subscription information.
243    ///
244    /// **Note:** Establishes a persistent connection to receive real-time stream events.
245    fn esub<S: ToRedisArgs>(stream_id: S) -> (SubscriptionInfo) {
246        cmd("ESUB").arg(stream_id)
247    }
248
249    /// Subscribe to events from a stream with partition key.
250    ///
251    /// # Parameters
252    /// - `stream_id`: Stream identifier to subscribe to
253    /// - `partition_key`: UUID to subscribe to specific partition
254    ///
255    /// # Example
256    /// ```ignore
257    /// let partition_key = Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000")?;
258    /// let subscription = conn.esub_with_partition_key("my-stream", partition_key)?;
259    /// ```
260    ///
261    /// # Returns
262    /// Returns subscription information.
263    fn esub_with_partition_key<S: ToRedisArgs>(stream_id: S, partition_key: Uuid) -> (SubscriptionInfo) {
264        cmd("ESUB").arg(stream_id).arg("PARTITION_KEY").arg(partition_key.to_string())
265    }
266
267    /// Subscribe to events from a stream starting from a specific version.
268    ///
269    /// # Parameters
270    /// - `stream_id`: Stream identifier to subscribe to
271    /// - `from_version`: Start streaming from this version number
272    ///
273    /// # Example
274    /// ```ignore
275    /// let subscription = conn.esub_from_version("my-stream", 100)?;
276    /// ```
277    ///
278    /// # Returns
279    /// Returns subscription information.
280    fn esub_from_version<S: ToRedisArgs>(stream_id: S, from_version: u64) -> (SubscriptionInfo) {
281        cmd("ESUB").arg(stream_id).arg("FROM").arg(from_version)
282    }
283
284    /// Subscribe to events from a stream with partition key and version.
285    ///
286    /// # Parameters
287    /// - `stream_id`: Stream identifier to subscribe to
288    /// - `partition_key`: UUID to subscribe to specific partition
289    /// - `from_version`: Starting version number
290    ///
291    /// # Example
292    /// ```ignore
293    /// let partition_key = Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000")?;
294    /// let subscription = conn.esub_with_partition_and_version("my-stream", partition_key, 100)?;
295    /// ```
296    ///
297    /// # Returns
298    /// Returns subscription information.
299    fn esub_with_partition_and_version<S: ToRedisArgs>(stream_id: S, partition_key: Uuid, from_version: u64) -> (SubscriptionInfo) {
300        cmd("ESUB").arg(stream_id).arg("PARTITION_KEY").arg(partition_key.to_string()).arg("FROM").arg(from_version)
301    }
302
303    /// Subscribe to events from a partition by partition key.
304    ///
305    /// # Parameters
306    /// - `partition_key`: UUID key to determine which partition to subscribe to
307    ///
308    /// # Example
309    /// ```ignore
310    /// let partition_key = Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000")?;
311    /// let subscription = conn.epsub_by_key(partition_key)?;
312    /// ```
313    ///
314    /// # Returns
315    /// Returns subscription information.
316    fn epsub_by_key<>(partition_key: Uuid) -> (SubscriptionInfo) {
317        cmd("EPSUB").arg(partition_key.to_string())
318    }
319
320    /// Subscribe to events from a partition by partition ID.
321    ///
322    /// # Parameters
323    /// - `partition_id`: Numeric partition identifier (0-65535)
324    ///
325    /// # Example
326    /// ```ignore
327    /// let subscription = conn.epsub_by_id(42)?;
328    /// ```
329    ///
330    /// # Returns
331    /// Returns subscription information.
332    fn epsub_by_id<>(partition_id: u16) -> (SubscriptionInfo) {
333        cmd("EPSUB").arg(partition_id)
334    }
335
336    /// Subscribe to events from a partition by partition key, starting from a specific sequence.
337    ///
338    /// # Parameters
339    /// - `partition_key`: UUID key to determine which partition to subscribe to
340    /// - `from_sequence`: Start streaming from this sequence number
341    ///
342    /// # Example
343    /// ```ignore
344    /// let partition_key = Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000")?;
345    /// let subscription = conn.epsub_by_key_from_sequence(partition_key, 1000)?;
346    /// ```
347    ///
348    /// # Returns
349    /// Returns subscription information.
350    fn epsub_by_key_from_sequence<>(partition_key: Uuid, from_sequence: u64) -> (SubscriptionInfo) {
351        cmd("EPSUB").arg(partition_key.to_string()).arg("FROM").arg(from_sequence)
352    }
353
354    /// Subscribe to events from a partition by partition ID, starting from a specific sequence.
355    ///
356    /// # Parameters
357    /// - `partition_id`: Numeric partition identifier (0-65535)
358    /// - `from_sequence`: Start streaming from this sequence number
359    ///
360    /// # Example
361    /// ```ignore
362    /// let subscription = conn.epsub_by_id_from_sequence(42, 1000)?;
363    /// ```
364    ///
365    /// # Returns
366    /// Returns subscription information.
367    fn epsub_by_id_from_sequence<>(partition_id: u16, from_sequence: u64) -> (SubscriptionInfo) {
368        cmd("EPSUB").arg(partition_id).arg("FROM").arg(from_sequence)
369    }
370
371    /// Handshake with the SierraDB server.
372    ///
373    /// # Parameters
374    /// - `version`: Protocol version (must be 3)
375    ///
376    /// # Example
377    /// ```ignore
378    /// let server_info = conn.hello(3)?;
379    /// ```
380    ///
381    /// # Returns
382    /// Returns server information including name, version, peer_id, and num_partitions.
383    fn hello<>(version: u32) -> (HelloResp) {
384        cmd("HELLO").arg(version)
385    }
386
387    /// Health check command.
388    ///
389    /// # Example
390    /// ```ignore
391    /// let response = conn.ping()?;
392    /// ```
393    ///
394    /// # Returns
395    /// Returns "PONG" string.
396    fn ping<>() -> (String) {
397        &mut cmd("PING")
398    }
399
400    /// Acknowledge events up to a specific cursor for a subscription.
401    ///
402    /// # Parameters
403    /// - `subscription_id`: UUID of the subscription
404    /// - `cursor`: Cursor number to acknowledge up to
405    ///
406    /// # Example
407    /// ```ignore
408    /// let subscription_id = Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000")?;
409    /// conn.eack(subscription_id, 100)?;
410    /// ```
411    ///
412    /// # Returns
413    /// Returns "OK" on success.
414    fn eack<>(subscription_id: Uuid, cursor: u64) -> (String) {
415        cmd("EACK").arg(subscription_id.to_string()).arg(cursor)
416    }
417}
418
419/// Extension trait for Redis clients to provide SierraDB async subscription
420/// functionality.
421///
422/// This trait provides convenience methods for creating typed subscriptions
423/// using a `SubscriptionManager`.
424pub trait SierraAsyncClientExt {
425    /// Create a new subscription manager for this client.
426    fn subscription_manager(
427        &self,
428    ) -> impl std::future::Future<Output = RedisResult<SubscriptionManager>> + Send;
429}
430
431impl SierraAsyncClientExt for Client {
432    fn subscription_manager(
433        &self,
434    ) -> impl std::future::Future<Output = RedisResult<SubscriptionManager>> + Send {
435        SubscriptionManager::new(self)
436    }
437}
438
439impl<T> Commands for T where T: ConnectionLike {}
440
441impl<T> TypedCommands for T where T: ConnectionLike {}
442
443#[cfg(feature = "aio")]
444impl<T> AsyncCommands for T where T: redis::aio::ConnectionLike + Send + Sync + Sized {}
445
446#[cfg(feature = "aio")]
447impl<T> AsyncTypedCommands for T where T: redis::aio::ConnectionLike + Send + Sync + Sized {}