Skip to main content

smith_bus/
consumer.rs

1use anyhow::{Context, Result};
2use async_nats::jetstream::{self, consumer::PullConsumer};
3use serde::de::DeserializeOwned;
4use std::time::Duration;
5use time;
6use tracing::{debug, error, info};
7
8use crate::{ConsumerConfig, ConsumerStartSequence, Message, WorkQueue};
9
10/// Consumer for receiving messages from JetStream
11pub struct Consumer {
12    consumer: PullConsumer,
13    capability: String,
14    work_queue: WorkQueue,
15}
16
17impl Consumer {
18    /// Create a new consumer for a specific capability
19    pub async fn new(
20        jetstream: jetstream::Context,
21        capability: &str,
22        config: ConsumerConfig,
23    ) -> Result<Self> {
24        info!(
25            "Creating consumer for capability: {} with config: {:?}",
26            capability, config
27        );
28
29        let filter_subject = Self::build_filter_subject(capability);
30        let deliver_policy = Self::convert_start_sequence_to_policy(&config.start_sequence);
31        let consumer_config =
32            Self::build_consumer_config(&config, capability, &filter_subject, deliver_policy);
33
34        let consumer =
35            Self::create_jetstream_consumer(jetstream, consumer_config, capability).await?;
36        let work_queue = WorkQueue::new(consumer.clone(), 10, Duration::from_secs(5));
37
38        info!(
39            "Consumer created successfully for capability: {}",
40            capability
41        );
42
43        Ok(Self {
44            consumer,
45            capability: capability.to_string(),
46            work_queue,
47        })
48    }
49
50    /// Build filter subject for the capability
51    fn build_filter_subject(capability: &str) -> String {
52        crate::subjects::SubjectBuilder::new()
53            .part("intents")
54            .part(capability)
55            .part("*")
56            .build()
57    }
58
59    /// Convert start sequence configuration to JetStream deliver policy
60    fn convert_start_sequence_to_policy(
61        start_sequence: &ConsumerStartSequence,
62    ) -> jetstream::consumer::DeliverPolicy {
63        match start_sequence {
64            ConsumerStartSequence::First => jetstream::consumer::DeliverPolicy::All,
65            ConsumerStartSequence::Latest => jetstream::consumer::DeliverPolicy::Last,
66            ConsumerStartSequence::Sequence(seq) => {
67                jetstream::consumer::DeliverPolicy::ByStartSequence {
68                    start_sequence: *seq,
69                }
70            }
71            ConsumerStartSequence::Time(time) => jetstream::consumer::DeliverPolicy::ByStartTime {
72                start_time: time::OffsetDateTime::from_unix_timestamp(time.timestamp())
73                    .unwrap_or_else(|_| time::OffsetDateTime::now_utc()),
74            },
75        }
76    }
77
78    /// Build JetStream consumer configuration
79    fn build_consumer_config(
80        config: &ConsumerConfig,
81        capability: &str,
82        filter_subject: &str,
83        deliver_policy: jetstream::consumer::DeliverPolicy,
84    ) -> jetstream::consumer::pull::Config {
85        jetstream::consumer::pull::Config {
86            durable_name: Some(config.name.clone()),
87            description: Some(format!("Consumer for {} capability", capability)),
88            filter_subject: filter_subject.to_string(),
89            deliver_policy,
90            ack_wait: config.ack_wait,
91            max_deliver: config.max_deliver,
92            max_ack_pending: 1000, // Allow up to 1000 unacknowledged messages
93            replay_policy: jetstream::consumer::ReplayPolicy::Instant,
94            ..Default::default()
95        }
96    }
97
98    /// Create the JetStream consumer
99    async fn create_jetstream_consumer(
100        jetstream: jetstream::Context,
101        consumer_config: jetstream::consumer::pull::Config,
102        capability: &str,
103    ) -> Result<PullConsumer> {
104        let stream_name = "INTENTS";
105        jetstream
106            .create_consumer_on_stream(consumer_config, stream_name)
107            .await
108            .with_context(|| format!("Failed to create consumer for capability: {}", capability))
109    }
110
111    /// Get the next message from the stream
112    pub async fn next_message<T: DeserializeOwned>(&mut self) -> Result<Option<Message<T>>> {
113        match self.work_queue.pull_one().await? {
114            Some(jetstream_message) => self
115                .process_jetstream_message(jetstream_message)
116                .await
117                .map(Some),
118            None => {
119                debug!("No messages available for capability: {}", self.capability);
120                Ok(None)
121            }
122        }
123    }
124
125    /// Process a single JetStream message into a typed Message
126    async fn process_jetstream_message<T: DeserializeOwned>(
127        &self,
128        jetstream_message: async_nats::jetstream::Message,
129    ) -> Result<Message<T>> {
130        let _info = jetstream_message
131            .info()
132            .map_err(|e| anyhow::anyhow!("Failed to get message info: {}", e))?;
133
134        debug!("Received message on subject: {}", jetstream_message.subject);
135
136        // Deserialize the message
137        let payload: T = serde_json::from_slice(&jetstream_message.payload)
138            .with_context(|| "Failed to deserialize message payload")?;
139
140        debug!("Deserialized message for capability: {}", self.capability);
141
142        Ok(Message {
143            payload,
144            jetstream_message: jetstream_message.clone(),
145            delivery_count: 1, // Default to 1, actual redelivery info not easily accessible
146            subject: jetstream_message.subject.to_string(),
147        })
148    }
149
150    /// Get a batch of messages from the stream
151    pub async fn next_batch<T: DeserializeOwned>(
152        &mut self,
153        batch_size: usize,
154    ) -> Result<Vec<Message<T>>> {
155        let messages = self.work_queue.pull_batch().await?;
156        let mut typed_messages = Vec::with_capacity(messages.len().min(batch_size));
157
158        for jetstream_message in messages.into_iter().take(batch_size) {
159            match self.try_deserialize_message(&jetstream_message).await {
160                Ok(typed_message) => typed_messages.push(typed_message),
161                Err(e) => {
162                    error!("Failed to process message in batch: {}", e);
163                    Self::handle_malformed_message(jetstream_message).await;
164                }
165            }
166        }
167
168        debug!("Retrieved batch of {} valid messages", typed_messages.len());
169        Ok(typed_messages)
170    }
171
172    /// Try to deserialize a JetStream message into a typed Message
173    async fn try_deserialize_message<T: DeserializeOwned>(
174        &self,
175        jetstream_message: &async_nats::jetstream::Message,
176    ) -> Result<Message<T>> {
177        let payload: T = serde_json::from_slice(&jetstream_message.payload)
178            .with_context(|| "Failed to deserialize message payload")?;
179
180        debug!("Deserialized message for capability: {}", self.capability);
181
182        Ok(Message {
183            payload,
184            subject: jetstream_message.subject.to_string(),
185            jetstream_message: jetstream_message.clone(),
186            delivery_count: 1, // Default to 1, actual redelivery info not easily accessible
187        })
188    }
189
190    /// Handle malformed message by acknowledging it to prevent infinite redelivery
191    async fn handle_malformed_message(jetstream_message: async_nats::jetstream::Message) {
192        if let Err(ack_err) = jetstream_message.ack().await {
193            error!("Failed to ack malformed message: {}", ack_err);
194        }
195    }
196
197    /// Get consumer information and statistics
198    pub async fn info(&mut self) -> Result<ConsumerInfo> {
199        let info = self
200            .consumer
201            .info()
202            .await
203            .context("Failed to get consumer info")?;
204
205        Ok(ConsumerInfo {
206            name: info.name.clone(),
207            stream_name: info.stream_name.clone(),
208            delivered: info.delivered.stream_sequence, // Use stream sequence as approximation
209            ack_pending: info.num_pending,
210            redelivered: 0, // Not available in async-nats 0.42
211            num_waiting: info.num_waiting as u64,
212        })
213    }
214
215    /// Delete this consumer (cleanup)
216    /// Note: async-nats 0.42 doesn't support consumer deletion - consumers are auto-cleaned up
217    pub async fn delete(self) -> Result<()> {
218        info!("Marking consumer for cleanup: {}", self.capability);
219        // In async-nats 0.42, consumers are automatically cleaned up when dropped
220        // No explicit delete method is available
221        info!("Consumer cleanup completed (automatic)");
222        Ok(())
223    }
224}
225
226/// Consumer information and statistics
227#[derive(Debug, Clone)]
228pub struct ConsumerInfo {
229    /// Consumer name
230    pub name: String,
231    /// Stream name this consumer is attached to
232    pub stream_name: String,
233    /// Number of messages delivered
234    pub delivered: u64,
235    /// Number of messages pending acknowledgment
236    pub ack_pending: u64,
237    /// Number of messages redelivered
238    pub redelivered: u64,
239    /// Number of messages waiting to be delivered
240    pub num_waiting: u64,
241}
242
243impl ConsumerInfo {
244    /// Check if the consumer is healthy (not backed up with unacked messages)
245    pub fn is_healthy(&self) -> bool {
246        // Consider unhealthy if more than 100 messages are pending ack
247        // or if there's a significant backlog
248        self.ack_pending < 100 && self.num_waiting < 1000
249    }
250}
251
252#[cfg(test)]
253mod tests {
254    use super::*;
255    use async_nats::jetstream::consumer::{DeliverPolicy, ReplayPolicy};
256
257    #[test]
258    fn test_consumer_info_health() {
259        let healthy_info = ConsumerInfo {
260            name: "test-consumer".to_string(),
261            stream_name: "INTENTS".to_string(),
262            delivered: 100,
263            ack_pending: 5,
264            redelivered: 2,
265            num_waiting: 10,
266        };
267
268        assert!(healthy_info.is_healthy());
269
270        let unhealthy_info = ConsumerInfo {
271            name: "test-consumer".to_string(),
272            stream_name: "INTENTS".to_string(),
273            delivered: 100,
274            ack_pending: 150, // Too many pending
275            redelivered: 2,
276            num_waiting: 10,
277        };
278
279        assert!(!unhealthy_info.is_healthy());
280
281        // Test edge cases
282        let edge_case_pending = ConsumerInfo {
283            name: "test-consumer".to_string(),
284            stream_name: "INTENTS".to_string(),
285            delivered: 100,
286            ack_pending: 100, // Exactly at threshold
287            redelivered: 2,
288            num_waiting: 10,
289        };
290        assert!(!edge_case_pending.is_healthy());
291
292        let edge_case_waiting = ConsumerInfo {
293            name: "test-consumer".to_string(),
294            stream_name: "INTENTS".to_string(),
295            delivered: 100,
296            ack_pending: 5,
297            redelivered: 2,
298            num_waiting: 1000, // Exactly at threshold
299        };
300        assert!(!edge_case_waiting.is_healthy());
301    }
302
303    #[test]
304    fn test_build_filter_subject() {
305        let subject = Consumer::build_filter_subject("fs.read.v1");
306        assert_eq!(subject, "smith.intents.fs.read.v1.*");
307    }
308
309    #[test]
310    fn test_convert_start_sequence_to_policy() {
311        let policy = Consumer::convert_start_sequence_to_policy(&ConsumerStartSequence::First);
312        assert_eq!(policy, DeliverPolicy::All);
313
314        let policy = Consumer::convert_start_sequence_to_policy(&ConsumerStartSequence::Latest);
315        assert_eq!(policy, DeliverPolicy::Last);
316
317        let policy =
318            Consumer::convert_start_sequence_to_policy(&ConsumerStartSequence::Sequence(42));
319        assert_eq!(
320            policy,
321            DeliverPolicy::ByStartSequence { start_sequence: 42 }
322        );
323
324        let time = chrono::Utc::now();
325        let policy = Consumer::convert_start_sequence_to_policy(&ConsumerStartSequence::Time(time));
326        let expected_time = time::OffsetDateTime::from_unix_timestamp(time.timestamp())
327            .unwrap_or_else(|_| time::OffsetDateTime::now_utc());
328        assert_eq!(
329            policy,
330            DeliverPolicy::ByStartTime {
331                start_time: expected_time
332            }
333        );
334    }
335
336    #[test]
337    fn test_build_consumer_config() {
338        let config = ConsumerConfig {
339            name: "test-consumer".to_string(),
340            consumer_group: Some("test-group".to_string()),
341            start_sequence: ConsumerStartSequence::First,
342            ack_wait: Duration::from_secs(30),
343            max_deliver: 3,
344            max_age: Some(Duration::from_secs(3600)),
345            worker_count: 2,
346        };
347
348        let consumer_config = Consumer::build_consumer_config(
349            &config,
350            "fs.read.v1",
351            "smith.intents.vetted.fs.read.v1",
352            DeliverPolicy::All,
353        );
354
355        assert_eq!(
356            consumer_config.durable_name,
357            Some("test-consumer".to_string())
358        );
359        assert_eq!(consumer_config.deliver_policy, DeliverPolicy::All);
360        assert_eq!(consumer_config.ack_wait, Duration::from_secs(30));
361        assert_eq!(consumer_config.max_deliver, 3);
362        assert_eq!(consumer_config.replay_policy, ReplayPolicy::Instant);
363        assert_eq!(
364            consumer_config.filter_subject,
365            "smith.intents.vetted.fs.read.v1".to_string()
366        );
367        assert_eq!(
368            consumer_config.description,
369            Some("Consumer for fs.read.v1 capability".to_string())
370        );
371    }
372
373    #[test]
374    fn test_build_consumer_config_with_default_name() {
375        let config = ConsumerConfig {
376            name: "http_fetch_v1_consumer".to_string(),
377            consumer_group: None,
378            start_sequence: ConsumerStartSequence::Latest,
379            ack_wait: Duration::from_secs(60),
380            max_deliver: 5,
381            max_age: None,
382            worker_count: 1,
383        };
384
385        let consumer_config = Consumer::build_consumer_config(
386            &config,
387            "http.fetch.v1",
388            "smith.intents.vetted.http.fetch.v1",
389            DeliverPolicy::Last,
390        );
391
392        assert_eq!(
393            consumer_config.durable_name,
394            Some("http_fetch_v1_consumer".to_string())
395        );
396        assert_eq!(consumer_config.deliver_policy, DeliverPolicy::Last);
397        assert_eq!(consumer_config.ack_wait, Duration::from_secs(60));
398        assert_eq!(consumer_config.max_deliver, 5);
399    }
400
401    #[test]
402    fn test_consumer_config_default() {
403        let config = ConsumerConfig::default();
404
405        assert!(!config.name.is_empty());
406        assert!(config.name.contains("consumer-"));
407        assert_eq!(config.consumer_group, None);
408        assert_eq!(config.max_deliver, 3);
409        assert_eq!(config.ack_wait, Duration::from_secs(30));
410        assert_eq!(config.max_age, Some(Duration::from_secs(24 * 60 * 60)));
411        assert!(matches!(
412            config.start_sequence,
413            ConsumerStartSequence::Latest
414        ));
415        assert_eq!(config.worker_count, 1);
416    }
417
418    #[test]
419    fn test_consumer_start_sequence_variants() {
420        // Test all variants exist and can be created
421        let _first = ConsumerStartSequence::First;
422        let _latest = ConsumerStartSequence::Latest;
423        let _sequence = ConsumerStartSequence::Sequence(100);
424        let _time = ConsumerStartSequence::Time(chrono::Utc::now());
425
426        // Test Debug formatting
427        let first_debug = format!("{:?}", ConsumerStartSequence::First);
428        assert!(first_debug.contains("First"));
429
430        let seq_debug = format!("{:?}", ConsumerStartSequence::Sequence(42));
431        assert!(seq_debug.contains("42"));
432    }
433
434    #[test]
435    fn test_consumer_info_debug_format() {
436        let info = ConsumerInfo {
437            name: "debug-test".to_string(),
438            stream_name: "TEST_STREAM".to_string(),
439            delivered: 42,
440            ack_pending: 3,
441            redelivered: 1,
442            num_waiting: 7,
443        };
444
445        let debug_output = format!("{:?}", info);
446        assert!(debug_output.contains("debug-test"));
447        assert!(debug_output.contains("TEST_STREAM"));
448        assert!(debug_output.contains("42"));
449    }
450
451    #[test]
452    fn test_consumer_info_clone() {
453        let original = ConsumerInfo {
454            name: "original".to_string(),
455            stream_name: "STREAM".to_string(),
456            delivered: 100,
457            ack_pending: 5,
458            redelivered: 2,
459            num_waiting: 10,
460        };
461
462        let cloned = original.clone();
463        assert_eq!(original.name, cloned.name);
464        assert_eq!(original.stream_name, cloned.stream_name);
465        assert_eq!(original.delivered, cloned.delivered);
466        assert_eq!(original.ack_pending, cloned.ack_pending);
467        assert_eq!(original.redelivered, cloned.redelivered);
468        assert_eq!(original.num_waiting, cloned.num_waiting);
469    }
470}