1use std::marker::PhantomData;
4
5use crate::{codec::CodecType, error::{Error, Result}};
6
7use super::consumer::{PullConsumerBuilder, PushConsumerBuilder};
8
9#[derive(Clone)]
15pub struct Stream<C: CodecType> {
16 inner: async_nats::jetstream::stream::Stream,
17 _codec: PhantomData<C>,
18}
19
20impl<C: CodecType> Stream<C> {
21 pub(crate) fn new(inner: async_nats::jetstream::stream::Stream) -> Self {
23 Self {
24 inner,
25 _codec: PhantomData,
26 }
27 }
28
29 pub fn inner(&self) -> &async_nats::jetstream::stream::Stream {
31 &self.inner
32 }
33
34 pub fn name(&self) -> &str {
36 &self.inner.cached_info().config.name
37 }
38
39 pub async fn info(&self) -> Result<StreamInfo> {
41 let mut stream = self.inner.clone();
42 let info = stream
43 .info()
44 .await
45 .map_err(|e| Error::JetStreamStream(e.to_string()))?;
46 Ok(StreamInfo {
47 config: StreamConfig::from_native(&info.config),
48 state: StreamState {
49 messages: info.state.messages,
50 bytes: info.state.bytes,
51 first_sequence: info.state.first_sequence,
52 last_sequence: info.state.last_sequence,
53 consumer_count: info.state.consumer_count,
54 },
55 })
56 }
57
58 pub fn pull_consumer_builder<T>(&self, name: &str) -> PullConsumerBuilder<T, C> {
88 PullConsumerBuilder::new(self.inner.clone(), name.to_string())
89 }
90
91 pub fn push_consumer_builder<T>(&self, name: &str) -> PushConsumerBuilder<T, C> {
120 PushConsumerBuilder::new(self.inner.clone(), name.to_string())
121 }
122
123 pub async fn get_pull_consumer<T>(
125 &self,
126 name: &str,
127 ) -> Result<super::consumer::PullConsumer<T, C>> {
128 let inner: async_nats::jetstream::consumer::Consumer<
129 async_nats::jetstream::consumer::pull::Config,
130 > = self
131 .inner
132 .get_consumer(name)
133 .await
134 .map_err(|e| Error::JetStreamConsumer(e.to_string()))?;
135 Ok(super::consumer::PullConsumer::new(inner))
136 }
137
138 pub async fn get_push_consumer<T>(
140 &self,
141 name: &str,
142 ) -> Result<super::consumer::PushConsumer<T, C>> {
143 let inner: async_nats::jetstream::consumer::Consumer<
144 async_nats::jetstream::consumer::push::Config,
145 > = self
146 .inner
147 .get_consumer(name)
148 .await
149 .map_err(|e| Error::JetStreamConsumer(e.to_string()))?;
150 Ok(super::consumer::PushConsumer::new(inner))
151 }
152
153 pub async fn delete_consumer(&self, name: &str) -> Result<()> {
155 self.inner
156 .delete_consumer(name)
157 .await
158 .map_err(|e| Error::JetStreamConsumer(e.to_string()))?;
159 Ok(())
160 }
161
162 pub async fn purge(&self) -> Result<u64> {
164 let response = self
165 .inner
166 .clone()
167 .purge()
168 .await
169 .map_err(|e| Error::JetStreamStream(e.to_string()))?;
170 Ok(response.purged)
171 }
172
173 pub async fn purge_subject(&self, filter: &str) -> Result<u64> {
175 let response = self
176 .inner
177 .clone()
178 .purge()
179 .filter(filter)
180 .await
181 .map_err(|e| Error::JetStreamStream(e.to_string()))?;
182 Ok(response.purged)
183 }
184}
185
186pub struct StreamBuilder<C: CodecType> {
188 context: async_nats::jetstream::Context,
189 config: async_nats::jetstream::stream::Config,
190 _codec: PhantomData<C>,
191}
192
193impl<C: CodecType> StreamBuilder<C> {
194 pub(crate) fn new(context: async_nats::jetstream::Context, name: String) -> Self {
196 Self {
197 context,
198 config: async_nats::jetstream::stream::Config {
199 name,
200 ..Default::default()
201 },
202 _codec: PhantomData,
203 }
204 }
205
206 pub fn subjects(mut self, subjects: Vec<String>) -> Self {
208 self.config.subjects = subjects;
209 self
210 }
211
212 pub fn subject(mut self, subject: impl Into<String>) -> Self {
214 self.config.subjects.push(subject.into());
215 self
216 }
217
218 pub fn description(mut self, description: impl Into<String>) -> Self {
220 self.config.description = Some(description.into());
221 self
222 }
223
224 pub fn retention(mut self, retention: RetentionPolicy) -> Self {
226 self.config.retention = retention.into();
227 self
228 }
229
230 pub fn max_messages(mut self, max: i64) -> Self {
232 self.config.max_messages = max;
233 self
234 }
235
236 pub fn max_messages_per_subject(mut self, max: i64) -> Self {
238 self.config.max_messages_per_subject = max;
239 self
240 }
241
242 pub fn max_bytes(mut self, max: i64) -> Self {
244 self.config.max_bytes = max;
245 self
246 }
247
248 pub fn max_message_size(mut self, max: i32) -> Self {
250 self.config.max_message_size = max;
251 self
252 }
253
254 pub fn max_age(mut self, age: std::time::Duration) -> Self {
256 self.config.max_age = age;
257 self
258 }
259
260 pub fn max_consumers(mut self, max: i32) -> Self {
262 self.config.max_consumers = max;
263 self
264 }
265
266 pub fn replicas(mut self, replicas: usize) -> Self {
268 self.config.num_replicas = replicas;
269 self
270 }
271
272 pub fn storage(mut self, storage: StorageType) -> Self {
274 self.config.storage = storage.into();
275 self
276 }
277
278 pub fn discard_policy(mut self, policy: DiscardPolicy) -> Self {
280 self.config.discard = policy.into();
281 self
282 }
283
284 pub fn duplicate_window(mut self, window: std::time::Duration) -> Self {
286 self.config.duplicate_window = window;
287 self
288 }
289
290 pub fn allow_direct(mut self, allow: bool) -> Self {
292 self.config.allow_direct = allow;
293 self
294 }
295
296 pub fn mirror(mut self, source: StreamSource) -> Self {
298 self.config.mirror = Some(source.into());
299 self
300 }
301
302 pub fn add_source(mut self, source: StreamSource) -> Self {
304 self.config.sources.get_or_insert_with(Vec::new).push(source.into());
305 self
306 }
307
308 pub fn sealed(mut self, sealed: bool) -> Self {
310 self.config.sealed = sealed;
311 self
312 }
313
314 pub fn deny_delete(mut self, deny: bool) -> Self {
316 self.config.deny_delete = deny;
317 self
318 }
319
320 pub fn deny_purge(mut self, deny: bool) -> Self {
322 self.config.deny_purge = deny;
323 self
324 }
325
326 pub fn allow_rollup(mut self, allow: bool) -> Self {
328 self.config.allow_rollup = allow;
329 self
330 }
331
332 pub fn compression(mut self, compression: Compression) -> Self {
334 self.config.compression = Some(compression.into());
335 self
336 }
337
338 pub fn first_sequence(mut self, seq: u64) -> Self {
340 self.config.first_sequence = Some(seq);
341 self
342 }
343
344 pub fn subject_transform(mut self, source: &str, destination: &str) -> Self {
346 self.config.subject_transform = Some(async_nats::jetstream::stream::SubjectTransform {
347 source: source.to_string(),
348 destination: destination.to_string(),
349 });
350 self
351 }
352
353 pub async fn create(self) -> Result<Stream<C>> {
355 let inner = self
356 .context
357 .create_stream(self.config)
358 .await?;
359 Ok(Stream::new(inner))
360 }
361
362 pub async fn create_or_update(self) -> Result<Stream<C>> {
364 let inner = self
365 .context
366 .get_or_create_stream(self.config)
367 .await?;
368 Ok(Stream::new(inner))
369 }
370}
371
372#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
374pub enum RetentionPolicy {
375 #[default]
377 Limits,
378 Interest,
380 WorkQueue,
382}
383
384impl From<RetentionPolicy> for async_nats::jetstream::stream::RetentionPolicy {
385 fn from(policy: RetentionPolicy) -> Self {
386 match policy {
387 RetentionPolicy::Limits => async_nats::jetstream::stream::RetentionPolicy::Limits,
388 RetentionPolicy::Interest => async_nats::jetstream::stream::RetentionPolicy::Interest,
389 RetentionPolicy::WorkQueue => async_nats::jetstream::stream::RetentionPolicy::WorkQueue,
390 }
391 }
392}
393
394#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
396pub enum StorageType {
397 #[default]
399 File,
400 Memory,
402}
403
404impl From<StorageType> for async_nats::jetstream::stream::StorageType {
405 fn from(storage: StorageType) -> Self {
406 match storage {
407 StorageType::File => async_nats::jetstream::stream::StorageType::File,
408 StorageType::Memory => async_nats::jetstream::stream::StorageType::Memory,
409 }
410 }
411}
412
413#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
415pub enum DiscardPolicy {
416 #[default]
418 Old,
419 New,
421}
422
423impl From<DiscardPolicy> for async_nats::jetstream::stream::DiscardPolicy {
424 fn from(policy: DiscardPolicy) -> Self {
425 match policy {
426 DiscardPolicy::Old => async_nats::jetstream::stream::DiscardPolicy::Old,
427 DiscardPolicy::New => async_nats::jetstream::stream::DiscardPolicy::New,
428 }
429 }
430}
431
432#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
434pub enum Compression {
435 #[default]
437 None,
438 S2,
440}
441
442impl From<Compression> for async_nats::jetstream::stream::Compression {
443 fn from(compression: Compression) -> Self {
444 match compression {
445 Compression::None => async_nats::jetstream::stream::Compression::None,
446 Compression::S2 => async_nats::jetstream::stream::Compression::S2,
447 }
448 }
449}
450
451#[derive(Debug, Clone)]
453pub struct StreamSource {
454 pub name: String,
456 pub start_seq: Option<u64>,
458 pub start_time: Option<time::OffsetDateTime>,
460 pub filter_subject: Option<String>,
462}
463
464impl StreamSource {
465 pub fn new(name: impl Into<String>) -> Self {
467 Self {
468 name: name.into(),
469 start_seq: None,
470 start_time: None,
471 filter_subject: None,
472 }
473 }
474
475 pub fn start_seq(mut self, seq: u64) -> Self {
477 self.start_seq = Some(seq);
478 self
479 }
480
481 pub fn start_time(mut self, time: time::OffsetDateTime) -> Self {
483 self.start_time = Some(time);
484 self
485 }
486
487 pub fn filter_subject(mut self, subject: impl Into<String>) -> Self {
489 self.filter_subject = Some(subject.into());
490 self
491 }
492}
493
494impl From<StreamSource> for async_nats::jetstream::stream::Source {
495 fn from(source: StreamSource) -> Self {
496 let mut s = async_nats::jetstream::stream::Source {
497 name: source.name,
498 ..Default::default()
499 };
500 if let Some(seq) = source.start_seq {
501 s.start_sequence = Some(seq);
502 }
503 if let Some(time) = source.start_time {
504 s.start_time = Some(time);
505 }
506 if let Some(subject) = source.filter_subject {
507 s.filter_subject = Some(subject);
508 }
509 s
510 }
511}
512
513#[derive(Debug, Clone)]
515pub struct StreamConfig {
516 pub name: String,
518 pub description: Option<String>,
520 pub subjects: Vec<String>,
522 pub retention: RetentionPolicy,
524 pub max_messages: i64,
526 pub max_bytes: i64,
528 pub max_age: std::time::Duration,
530 pub max_message_size: i32,
532 pub storage: StorageType,
534 pub replicas: usize,
536}
537
538impl StreamConfig {
539 pub(crate) fn from_native(config: &async_nats::jetstream::stream::Config) -> Self {
541 Self {
542 name: config.name.clone(),
543 description: config.description.clone(),
544 subjects: config.subjects.clone(),
545 retention: match config.retention {
546 async_nats::jetstream::stream::RetentionPolicy::Limits => RetentionPolicy::Limits,
547 async_nats::jetstream::stream::RetentionPolicy::Interest => {
548 RetentionPolicy::Interest
549 }
550 async_nats::jetstream::stream::RetentionPolicy::WorkQueue => {
551 RetentionPolicy::WorkQueue
552 }
553 },
554 max_messages: config.max_messages,
555 max_bytes: config.max_bytes,
556 max_age: config.max_age,
557 max_message_size: config.max_message_size,
558 storage: match config.storage {
559 async_nats::jetstream::stream::StorageType::File => StorageType::File,
560 async_nats::jetstream::stream::StorageType::Memory => StorageType::Memory,
561 },
562 replicas: config.num_replicas,
563 }
564 }
565}
566
567#[derive(Debug, Clone)]
569pub struct StreamState {
570 pub messages: u64,
572 pub bytes: u64,
574 pub first_sequence: u64,
576 pub last_sequence: u64,
578 pub consumer_count: usize,
580}
581
582#[derive(Debug, Clone)]
584pub struct StreamInfo {
585 pub config: StreamConfig,
587 pub state: StreamState,
589}