1use anyhow::{Context, Result};
2use async_nats::jetstream::{self, consumer::PullConsumer};
3use serde::de::DeserializeOwned;
4use std::time::Duration;
5use time;
6use tracing::{debug, error, info};
7
8use crate::{ConsumerConfig, ConsumerStartSequence, Message, WorkQueue};
9
10pub struct Consumer {
12 consumer: PullConsumer,
13 capability: String,
14 work_queue: WorkQueue,
15}
16
17impl Consumer {
18 pub async fn new(
20 jetstream: jetstream::Context,
21 capability: &str,
22 config: ConsumerConfig,
23 ) -> Result<Self> {
24 info!(
25 "Creating consumer for capability: {} with config: {:?}",
26 capability, config
27 );
28
29 let filter_subject = Self::build_filter_subject(capability);
30 let deliver_policy = Self::convert_start_sequence_to_policy(&config.start_sequence);
31 let consumer_config =
32 Self::build_consumer_config(&config, capability, &filter_subject, deliver_policy);
33
34 let consumer =
35 Self::create_jetstream_consumer(jetstream, consumer_config, capability).await?;
36 let work_queue = WorkQueue::new(consumer.clone(), 10, Duration::from_secs(5));
37
38 info!(
39 "Consumer created successfully for capability: {}",
40 capability
41 );
42
43 Ok(Self {
44 consumer,
45 capability: capability.to_string(),
46 work_queue,
47 })
48 }
49
50 fn build_filter_subject(capability: &str) -> String {
52 crate::subjects::SubjectBuilder::new()
53 .part("intents")
54 .part(capability)
55 .part("*")
56 .build()
57 }
58
59 fn convert_start_sequence_to_policy(
61 start_sequence: &ConsumerStartSequence,
62 ) -> jetstream::consumer::DeliverPolicy {
63 match start_sequence {
64 ConsumerStartSequence::First => jetstream::consumer::DeliverPolicy::All,
65 ConsumerStartSequence::Latest => jetstream::consumer::DeliverPolicy::Last,
66 ConsumerStartSequence::Sequence(seq) => {
67 jetstream::consumer::DeliverPolicy::ByStartSequence {
68 start_sequence: *seq,
69 }
70 }
71 ConsumerStartSequence::Time(time) => jetstream::consumer::DeliverPolicy::ByStartTime {
72 start_time: time::OffsetDateTime::from_unix_timestamp(time.timestamp())
73 .unwrap_or_else(|_| time::OffsetDateTime::now_utc()),
74 },
75 }
76 }
77
78 fn build_consumer_config(
80 config: &ConsumerConfig,
81 capability: &str,
82 filter_subject: &str,
83 deliver_policy: jetstream::consumer::DeliverPolicy,
84 ) -> jetstream::consumer::pull::Config {
85 jetstream::consumer::pull::Config {
86 durable_name: Some(config.name.clone()),
87 description: Some(format!("Consumer for {} capability", capability)),
88 filter_subject: filter_subject.to_string(),
89 deliver_policy,
90 ack_wait: config.ack_wait,
91 max_deliver: config.max_deliver,
92 max_ack_pending: 1000, replay_policy: jetstream::consumer::ReplayPolicy::Instant,
94 ..Default::default()
95 }
96 }
97
98 async fn create_jetstream_consumer(
100 jetstream: jetstream::Context,
101 consumer_config: jetstream::consumer::pull::Config,
102 capability: &str,
103 ) -> Result<PullConsumer> {
104 let stream_name = "INTENTS";
105 jetstream
106 .create_consumer_on_stream(consumer_config, stream_name)
107 .await
108 .with_context(|| format!("Failed to create consumer for capability: {}", capability))
109 }
110
111 pub async fn next_message<T: DeserializeOwned>(&mut self) -> Result<Option<Message<T>>> {
113 match self.work_queue.pull_one().await? {
114 Some(jetstream_message) => self
115 .process_jetstream_message(jetstream_message)
116 .await
117 .map(Some),
118 None => {
119 debug!("No messages available for capability: {}", self.capability);
120 Ok(None)
121 }
122 }
123 }
124
125 async fn process_jetstream_message<T: DeserializeOwned>(
127 &self,
128 jetstream_message: async_nats::jetstream::Message,
129 ) -> Result<Message<T>> {
130 let _info = jetstream_message
131 .info()
132 .map_err(|e| anyhow::anyhow!("Failed to get message info: {}", e))?;
133
134 debug!("Received message on subject: {}", jetstream_message.subject);
135
136 let payload: T = serde_json::from_slice(&jetstream_message.payload)
138 .with_context(|| "Failed to deserialize message payload")?;
139
140 debug!("Deserialized message for capability: {}", self.capability);
141
142 Ok(Message {
143 payload,
144 jetstream_message: jetstream_message.clone(),
145 delivery_count: 1, subject: jetstream_message.subject.to_string(),
147 })
148 }
149
150 pub async fn next_batch<T: DeserializeOwned>(
152 &mut self,
153 batch_size: usize,
154 ) -> Result<Vec<Message<T>>> {
155 let messages = self.work_queue.pull_batch().await?;
156 let mut typed_messages = Vec::with_capacity(messages.len().min(batch_size));
157
158 for jetstream_message in messages.into_iter().take(batch_size) {
159 match self.try_deserialize_message(&jetstream_message).await {
160 Ok(typed_message) => typed_messages.push(typed_message),
161 Err(e) => {
162 error!("Failed to process message in batch: {}", e);
163 Self::handle_malformed_message(jetstream_message).await;
164 }
165 }
166 }
167
168 debug!("Retrieved batch of {} valid messages", typed_messages.len());
169 Ok(typed_messages)
170 }
171
172 async fn try_deserialize_message<T: DeserializeOwned>(
174 &self,
175 jetstream_message: &async_nats::jetstream::Message,
176 ) -> Result<Message<T>> {
177 let payload: T = serde_json::from_slice(&jetstream_message.payload)
178 .with_context(|| "Failed to deserialize message payload")?;
179
180 debug!("Deserialized message for capability: {}", self.capability);
181
182 Ok(Message {
183 payload,
184 subject: jetstream_message.subject.to_string(),
185 jetstream_message: jetstream_message.clone(),
186 delivery_count: 1, })
188 }
189
190 async fn handle_malformed_message(jetstream_message: async_nats::jetstream::Message) {
192 if let Err(ack_err) = jetstream_message.ack().await {
193 error!("Failed to ack malformed message: {}", ack_err);
194 }
195 }
196
197 pub async fn info(&mut self) -> Result<ConsumerInfo> {
199 let info = self
200 .consumer
201 .info()
202 .await
203 .context("Failed to get consumer info")?;
204
205 Ok(ConsumerInfo {
206 name: info.name.clone(),
207 stream_name: info.stream_name.clone(),
208 delivered: info.delivered.stream_sequence, ack_pending: info.num_pending,
210 redelivered: 0, num_waiting: info.num_waiting as u64,
212 })
213 }
214
215 pub async fn delete(self) -> Result<()> {
218 info!("Marking consumer for cleanup: {}", self.capability);
219 info!("Consumer cleanup completed (automatic)");
222 Ok(())
223 }
224}
225
226#[derive(Debug, Clone)]
228pub struct ConsumerInfo {
229 pub name: String,
231 pub stream_name: String,
233 pub delivered: u64,
235 pub ack_pending: u64,
237 pub redelivered: u64,
239 pub num_waiting: u64,
241}
242
243impl ConsumerInfo {
244 pub fn is_healthy(&self) -> bool {
246 self.ack_pending < 100 && self.num_waiting < 1000
249 }
250}
251
252#[cfg(test)]
253mod tests {
254 use super::*;
255 use async_nats::jetstream::consumer::{DeliverPolicy, ReplayPolicy};
256
257 #[test]
258 fn test_consumer_info_health() {
259 let healthy_info = ConsumerInfo {
260 name: "test-consumer".to_string(),
261 stream_name: "INTENTS".to_string(),
262 delivered: 100,
263 ack_pending: 5,
264 redelivered: 2,
265 num_waiting: 10,
266 };
267
268 assert!(healthy_info.is_healthy());
269
270 let unhealthy_info = ConsumerInfo {
271 name: "test-consumer".to_string(),
272 stream_name: "INTENTS".to_string(),
273 delivered: 100,
274 ack_pending: 150, redelivered: 2,
276 num_waiting: 10,
277 };
278
279 assert!(!unhealthy_info.is_healthy());
280
281 let edge_case_pending = ConsumerInfo {
283 name: "test-consumer".to_string(),
284 stream_name: "INTENTS".to_string(),
285 delivered: 100,
286 ack_pending: 100, redelivered: 2,
288 num_waiting: 10,
289 };
290 assert!(!edge_case_pending.is_healthy());
291
292 let edge_case_waiting = ConsumerInfo {
293 name: "test-consumer".to_string(),
294 stream_name: "INTENTS".to_string(),
295 delivered: 100,
296 ack_pending: 5,
297 redelivered: 2,
298 num_waiting: 1000, };
300 assert!(!edge_case_waiting.is_healthy());
301 }
302
303 #[test]
304 fn test_build_filter_subject() {
305 let subject = Consumer::build_filter_subject("fs.read.v1");
306 assert_eq!(subject, "smith.intents.fs.read.v1.*");
307 }
308
309 #[test]
310 fn test_convert_start_sequence_to_policy() {
311 let policy = Consumer::convert_start_sequence_to_policy(&ConsumerStartSequence::First);
312 assert_eq!(policy, DeliverPolicy::All);
313
314 let policy = Consumer::convert_start_sequence_to_policy(&ConsumerStartSequence::Latest);
315 assert_eq!(policy, DeliverPolicy::Last);
316
317 let policy =
318 Consumer::convert_start_sequence_to_policy(&ConsumerStartSequence::Sequence(42));
319 assert_eq!(
320 policy,
321 DeliverPolicy::ByStartSequence { start_sequence: 42 }
322 );
323
324 let time = chrono::Utc::now();
325 let policy = Consumer::convert_start_sequence_to_policy(&ConsumerStartSequence::Time(time));
326 let expected_time = time::OffsetDateTime::from_unix_timestamp(time.timestamp())
327 .unwrap_or_else(|_| time::OffsetDateTime::now_utc());
328 assert_eq!(
329 policy,
330 DeliverPolicy::ByStartTime {
331 start_time: expected_time
332 }
333 );
334 }
335
336 #[test]
337 fn test_build_consumer_config() {
338 let config = ConsumerConfig {
339 name: "test-consumer".to_string(),
340 consumer_group: Some("test-group".to_string()),
341 start_sequence: ConsumerStartSequence::First,
342 ack_wait: Duration::from_secs(30),
343 max_deliver: 3,
344 max_age: Some(Duration::from_secs(3600)),
345 worker_count: 2,
346 };
347
348 let consumer_config = Consumer::build_consumer_config(
349 &config,
350 "fs.read.v1",
351 "smith.intents.vetted.fs.read.v1",
352 DeliverPolicy::All,
353 );
354
355 assert_eq!(
356 consumer_config.durable_name,
357 Some("test-consumer".to_string())
358 );
359 assert_eq!(consumer_config.deliver_policy, DeliverPolicy::All);
360 assert_eq!(consumer_config.ack_wait, Duration::from_secs(30));
361 assert_eq!(consumer_config.max_deliver, 3);
362 assert_eq!(consumer_config.replay_policy, ReplayPolicy::Instant);
363 assert_eq!(
364 consumer_config.filter_subject,
365 "smith.intents.vetted.fs.read.v1".to_string()
366 );
367 assert_eq!(
368 consumer_config.description,
369 Some("Consumer for fs.read.v1 capability".to_string())
370 );
371 }
372
373 #[test]
374 fn test_build_consumer_config_with_default_name() {
375 let config = ConsumerConfig {
376 name: "http_fetch_v1_consumer".to_string(),
377 consumer_group: None,
378 start_sequence: ConsumerStartSequence::Latest,
379 ack_wait: Duration::from_secs(60),
380 max_deliver: 5,
381 max_age: None,
382 worker_count: 1,
383 };
384
385 let consumer_config = Consumer::build_consumer_config(
386 &config,
387 "http.fetch.v1",
388 "smith.intents.vetted.http.fetch.v1",
389 DeliverPolicy::Last,
390 );
391
392 assert_eq!(
393 consumer_config.durable_name,
394 Some("http_fetch_v1_consumer".to_string())
395 );
396 assert_eq!(consumer_config.deliver_policy, DeliverPolicy::Last);
397 assert_eq!(consumer_config.ack_wait, Duration::from_secs(60));
398 assert_eq!(consumer_config.max_deliver, 5);
399 }
400
401 #[test]
402 fn test_consumer_config_default() {
403 let config = ConsumerConfig::default();
404
405 assert!(!config.name.is_empty());
406 assert!(config.name.contains("consumer-"));
407 assert_eq!(config.consumer_group, None);
408 assert_eq!(config.max_deliver, 3);
409 assert_eq!(config.ack_wait, Duration::from_secs(30));
410 assert_eq!(config.max_age, Some(Duration::from_secs(24 * 60 * 60)));
411 assert!(matches!(
412 config.start_sequence,
413 ConsumerStartSequence::Latest
414 ));
415 assert_eq!(config.worker_count, 1);
416 }
417
418 #[test]
419 fn test_consumer_start_sequence_variants() {
420 let _first = ConsumerStartSequence::First;
422 let _latest = ConsumerStartSequence::Latest;
423 let _sequence = ConsumerStartSequence::Sequence(100);
424 let _time = ConsumerStartSequence::Time(chrono::Utc::now());
425
426 let first_debug = format!("{:?}", ConsumerStartSequence::First);
428 assert!(first_debug.contains("First"));
429
430 let seq_debug = format!("{:?}", ConsumerStartSequence::Sequence(42));
431 assert!(seq_debug.contains("42"));
432 }
433
434 #[test]
435 fn test_consumer_info_debug_format() {
436 let info = ConsumerInfo {
437 name: "debug-test".to_string(),
438 stream_name: "TEST_STREAM".to_string(),
439 delivered: 42,
440 ack_pending: 3,
441 redelivered: 1,
442 num_waiting: 7,
443 };
444
445 let debug_output = format!("{:?}", info);
446 assert!(debug_output.contains("debug-test"));
447 assert!(debug_output.contains("TEST_STREAM"));
448 assert!(debug_output.contains("42"));
449 }
450
451 #[test]
452 fn test_consumer_info_clone() {
453 let original = ConsumerInfo {
454 name: "original".to_string(),
455 stream_name: "STREAM".to_string(),
456 delivered: 100,
457 ack_pending: 5,
458 redelivered: 2,
459 num_waiting: 10,
460 };
461
462 let cloned = original.clone();
463 assert_eq!(original.name, cloned.name);
464 assert_eq!(original.stream_name, cloned.stream_name);
465 assert_eq!(original.delivered, cloned.delivered);
466 assert_eq!(original.ack_pending, cloned.ack_pending);
467 assert_eq!(original.redelivered, cloned.redelivered);
468 assert_eq!(original.num_waiting, cloned.num_waiting);
469 }
470}