1use crate::config::{MessageBroker, ProjectConfig};
7
8fn to_pascal_case(s: &str) -> String {
10 s.split(|c| c == '-' || c == '_')
11 .map(|word| {
12 let mut chars = word.chars();
13 match chars.next() {
14 None => String::new(),
15 Some(first) => first.to_uppercase().chain(chars).collect(),
16 }
17 })
18 .collect()
19}
20
21pub fn cargo_toml(config: &ProjectConfig) -> String {
23 let consumer = config.consumer.as_ref().unwrap();
24 let name = &config.name;
25
26 let broker_deps = match consumer.broker {
27 MessageBroker::Kafka => r#"rdkafka = { version = "0.36", features = ["cmake-build"] }"#,
28 MessageBroker::RabbitMq => r#"lapin = "2.3""#,
29 MessageBroker::Redis => r#"redis = { version = "0.25", features = ["tokio-comp", "streams"] }"#,
30 MessageBroker::Sqs => r#"aws-sdk-sqs = "1.0""#,
31 MessageBroker::PubSub => r#"google-cloud-pubsub = "0.25""#,
32 };
33
34 format!(
35 r#"[package]
36name = "{name}"
37version = "0.1.0"
38edition = "2021"
39rust-version = "1.75"
40description = "{display_name}"
41
42[dependencies]
43# AllFrame
44allframe-core = {{ version = "0.1", features = ["resilience", "otel"] }}
45
46# Message Broker
47{broker_deps}
48
49# Async
50tokio = {{ version = "1", features = ["full"] }}
51async-trait = "0.1"
52futures = "0.3"
53
54# Serialization
55serde = {{ version = "1.0", features = ["derive"] }}
56serde_json = "1.0"
57
58# Error handling
59thiserror = "2.0"
60anyhow = "1.0"
61
62# Tracing & Metrics
63tracing = "0.1"
64tracing-subscriber = {{ version = "0.3", features = ["env-filter"] }}
65opentelemetry = {{ version = "0.27", features = ["metrics"] }}
66
67# Utilities
68chrono = {{ version = "0.4", features = ["serde"] }}
69uuid = {{ version = "1.0", features = ["v4", "serde"] }}
70dotenvy = "0.15"
71
72# Health checks
73axum = "0.7"
74
75[dev-dependencies]
76tokio-test = "0.4"
77mockall = "0.13"
78
79[[bin]]
80name = "{name}"
81path = "src/main.rs"
82"#,
83 name = name,
84 display_name = consumer.display_name,
85 broker_deps = broker_deps,
86 )
87}
88
89pub fn main_rs(config: &ProjectConfig) -> String {
91 let consumer = config.consumer.as_ref().unwrap();
92 let pascal_name = to_pascal_case(&consumer.service_name);
93
94 format!(
95 r#"//! {display_name}
96//!
97//! An event consumer service with idempotency, retry, and dead letter queue handling.
98
99use std::sync::Arc;
100use tracing::info;
101
102mod config;
103mod error;
104mod domain;
105mod application;
106mod infrastructure;
107
108use config::Config;
109use application::{pascal_name}Consumer;
110use infrastructure::{{
111 KafkaMessageBroker,
112 InMemoryIdempotencyStore,
113 HealthServer,
114}};
115
116#[tokio::main]
117async fn main() -> anyhow::Result<()> {{
118 // Load environment variables
119 dotenvy::dotenv().ok();
120
121 // Initialize tracing
122 tracing_subscriber::fmt()
123 .with_env_filter(
124 tracing_subscriber::EnvFilter::from_default_env()
125 .add_directive(tracing::Level::INFO.into()),
126 )
127 .init();
128
129 // Load configuration
130 let config = Config::from_env();
131 info!("Starting {display_name}");
132 info!("Broker: {{}}", config.broker.brokers);
133 info!("Topics: {{:?}}", config.topics);
134
135 // Create idempotency store
136 let idempotency_store = Arc::new(InMemoryIdempotencyStore::new());
137
138 // Create message broker
139 let broker = KafkaMessageBroker::new(&config.broker).await?;
140
141 // Create consumer
142 let consumer = {pascal_name}Consumer::new(
143 broker,
144 idempotency_store,
145 config.retry.clone(),
146 config.dlq.clone(),
147 );
148
149 // Start health server in background
150 let health_handle = tokio::spawn(async move {{
151 let health_server = HealthServer::new(config.server.health_port);
152 health_server.run().await
153 }});
154
155 // Run consumer
156 info!("Consumer started, waiting for messages...");
157 consumer.run(&config.topics).await?;
158
159 health_handle.abort();
160 info!("Consumer shutdown complete");
161 Ok(())
162}}
163"#,
164 display_name = consumer.display_name,
165 pascal_name = pascal_name,
166 )
167}
168
169pub fn config_rs(config: &ProjectConfig) -> String {
171 let consumer = config.consumer.as_ref().unwrap();
172 let service_name = &consumer.service_name;
173 let upper_name = service_name.to_uppercase().replace('-', "_");
174
175 format!(
176 r#"//! Configuration module
177//!
178//! Loads configuration from environment variables.
179
180use std::time::Duration;
181
182/// Main configuration
183#[derive(Debug, Clone)]
184pub struct Config {{
185 pub broker: BrokerConfig,
186 pub topics: Vec<String>,
187 pub group_id: String,
188 pub retry: RetryConfig,
189 pub dlq: DlqConfig,
190 pub server: ServerConfig,
191}}
192
193/// Message broker configuration
194#[derive(Debug, Clone)]
195pub struct BrokerConfig {{
196 pub brokers: String,
197 pub security_protocol: String,
198 pub sasl_mechanism: Option<String>,
199 pub sasl_username: Option<String>,
200 pub sasl_password: Option<String>,
201}}
202
203/// Retry configuration
204#[derive(Debug, Clone)]
205pub struct RetryConfig {{
206 pub max_attempts: u32,
207 pub initial_backoff: Duration,
208 pub max_backoff: Duration,
209 pub multiplier: f64,
210}}
211
212/// Dead Letter Queue configuration
213#[derive(Debug, Clone)]
214pub struct DlqConfig {{
215 pub enabled: bool,
216 pub suffix: String,
217}}
218
219/// Server configuration
220#[derive(Debug, Clone)]
221pub struct ServerConfig {{
222 pub health_port: u16,
223 pub metrics_port: u16,
224}}
225
226impl Config {{
227 pub fn from_env() -> Self {{
228 Self {{
229 broker: BrokerConfig {{
230 brokers: std::env::var("{upper_name}_BROKERS")
231 .unwrap_or_else(|_| "localhost:9092".to_string()),
232 security_protocol: std::env::var("{upper_name}_SECURITY_PROTOCOL")
233 .unwrap_or_else(|_| "PLAINTEXT".to_string()),
234 sasl_mechanism: std::env::var("{upper_name}_SASL_MECHANISM").ok(),
235 sasl_username: std::env::var("{upper_name}_SASL_USERNAME").ok(),
236 sasl_password: std::env::var("{upper_name}_SASL_PASSWORD").ok(),
237 }},
238 topics: std::env::var("{upper_name}_TOPICS")
239 .unwrap_or_else(|_| "events".to_string())
240 .split(',')
241 .map(|s| s.trim().to_string())
242 .collect(),
243 group_id: std::env::var("{upper_name}_GROUP_ID")
244 .unwrap_or_else(|_| "{service_name}-group".to_string()),
245 retry: RetryConfig {{
246 max_attempts: std::env::var("{upper_name}_RETRY_MAX_ATTEMPTS")
247 .ok()
248 .and_then(|v| v.parse().ok())
249 .unwrap_or({max_attempts}),
250 initial_backoff: Duration::from_millis(
251 std::env::var("{upper_name}_RETRY_INITIAL_BACKOFF_MS")
252 .ok()
253 .and_then(|v| v.parse().ok())
254 .unwrap_or({initial_backoff_ms}),
255 ),
256 max_backoff: Duration::from_millis(
257 std::env::var("{upper_name}_RETRY_MAX_BACKOFF_MS")
258 .ok()
259 .and_then(|v| v.parse().ok())
260 .unwrap_or({max_backoff_ms}),
261 ),
262 multiplier: std::env::var("{upper_name}_RETRY_MULTIPLIER")
263 .ok()
264 .and_then(|v| v.parse().ok())
265 .unwrap_or({multiplier}),
266 }},
267 dlq: DlqConfig {{
268 enabled: std::env::var("{upper_name}_DLQ_ENABLED")
269 .ok()
270 .and_then(|v| v.parse().ok())
271 .unwrap_or(true),
272 suffix: std::env::var("{upper_name}_DLQ_SUFFIX")
273 .unwrap_or_else(|_| ".dlq".to_string()),
274 }},
275 server: ServerConfig {{
276 health_port: std::env::var("{upper_name}_HEALTH_PORT")
277 .ok()
278 .and_then(|v| v.parse().ok())
279 .unwrap_or({health_port}),
280 metrics_port: std::env::var("{upper_name}_METRICS_PORT")
281 .ok()
282 .and_then(|v| v.parse().ok())
283 .unwrap_or({metrics_port}),
284 }},
285 }}
286 }}
287}}
288"#,
289 upper_name = upper_name,
290 service_name = service_name,
291 max_attempts = consumer.retry.max_attempts,
292 initial_backoff_ms = consumer.retry.initial_backoff_ms,
293 max_backoff_ms = consumer.retry.max_backoff_ms,
294 multiplier = consumer.retry.multiplier,
295 health_port = consumer.server.health_port,
296 metrics_port = consumer.server.metrics_port,
297 )
298}
299
300pub fn error_rs(config: &ProjectConfig) -> String {
302 let consumer = config.consumer.as_ref().unwrap();
303 let pascal_name = to_pascal_case(&consumer.service_name);
304
305 format!(
306 r#"//! Error types for the consumer service
307
308use thiserror::Error;
309
310/// Consumer error type
311#[derive(Error, Debug)]
312pub enum {pascal_name}Error {{
313 #[error("Broker error: {{0}}")]
314 BrokerError(String),
315
316 #[error("Deserialization error: {{0}}")]
317 DeserializationError(String),
318
319 #[error("Handler error: {{0}}")]
320 HandlerError(String),
321
322 #[error("Idempotency error: {{0}}")]
323 IdempotencyError(String),
324
325 #[error("DLQ error: {{0}}")]
326 DlqError(String),
327
328 #[error("Configuration error: {{0}}")]
329 ConfigError(String),
330}}
331
332/// Result type alias
333pub type Result<T> = std::result::Result<T, {pascal_name}Error>;
334
335impl From<{pascal_name}Error> for tonic::Status {{
336 fn from(e: {pascal_name}Error) -> Self {{
337 tonic::Status::internal(e.to_string())
338 }}
339}}
340"#,
341 pascal_name = pascal_name,
342 )
343}
344
345pub fn domain_mod(_config: &ProjectConfig) -> String {
347 r#"//! Domain layer - Events and handlers
348
349pub mod events;
350pub mod handlers;
351
352pub use events::*;
353pub use handlers::*;
354"#
355 .to_string()
356}
357
358pub fn domain_events(_config: &ProjectConfig) -> String {
360 r#"//! Domain events
361
362use serde::{Deserialize, Serialize};
363use chrono::{DateTime, Utc};
364use uuid::Uuid;
365
366/// Base event envelope
367#[derive(Debug, Clone, Serialize, Deserialize)]
368pub struct EventEnvelope<T> {
369 /// Unique event ID (used for idempotency)
370 pub id: Uuid,
371 /// Event type name
372 pub event_type: String,
373 /// Event timestamp
374 pub timestamp: DateTime<Utc>,
375 /// Correlation ID for tracing
376 pub correlation_id: Option<Uuid>,
377 /// Event payload
378 pub payload: T,
379 /// Event metadata
380 pub metadata: EventMetadata,
381}
382
383/// Event metadata
384#[derive(Debug, Clone, Default, Serialize, Deserialize)]
385pub struct EventMetadata {
386 /// Source service
387 pub source: Option<String>,
388 /// Event version
389 pub version: Option<String>,
390 /// Retry count
391 pub retry_count: u32,
392}
393
394impl<T> EventEnvelope<T> {
395 /// Create a new event envelope
396 pub fn new(event_type: impl Into<String>, payload: T) -> Self {
397 Self {
398 id: Uuid::new_v4(),
399 event_type: event_type.into(),
400 timestamp: Utc::now(),
401 correlation_id: None,
402 payload,
403 metadata: EventMetadata::default(),
404 }
405 }
406
407 /// Set correlation ID
408 pub fn with_correlation_id(mut self, id: Uuid) -> Self {
409 self.correlation_id = Some(id);
410 self
411 }
412}
413
414/// Example domain event: User Created
415#[derive(Debug, Clone, Serialize, Deserialize)]
416pub struct UserCreated {
417 pub user_id: Uuid,
418 pub email: String,
419 pub name: String,
420}
421
422/// Example domain event: Order Placed
423#[derive(Debug, Clone, Serialize, Deserialize)]
424pub struct OrderPlaced {
425 pub order_id: Uuid,
426 pub user_id: Uuid,
427 pub total: f64,
428 pub items: Vec<OrderItem>,
429}
430
431/// Order item
432#[derive(Debug, Clone, Serialize, Deserialize)]
433pub struct OrderItem {
434 pub product_id: Uuid,
435 pub quantity: u32,
436 pub price: f64,
437}
438"#
439 .to_string()
440}
441
442pub fn domain_handlers(config: &ProjectConfig) -> String {
444 let consumer = config.consumer.as_ref().unwrap();
445 let pascal_name = to_pascal_case(&consumer.service_name);
446
447 format!(
448 r#"//! Event handlers
449
450use async_trait::async_trait;
451use tracing::{{info, warn}};
452
453use crate::domain::events::*;
454use crate::error::Result;
455
456/// Event handler trait
457#[async_trait]
458pub trait EventHandler<E>: Send + Sync {{
459 /// Handle an event
460 async fn handle(&self, event: EventEnvelope<E>) -> Result<()>;
461}}
462
463/// User created event handler
464pub struct UserCreatedHandler;
465
466#[async_trait]
467impl EventHandler<UserCreated> for UserCreatedHandler {{
468 async fn handle(&self, event: EventEnvelope<UserCreated>) -> Result<()> {{
469 info!(
470 user_id = %event.payload.user_id,
471 email = %event.payload.email,
472 "Processing UserCreated event"
473 );
474
475 // TODO: Implement your business logic here
476 // Examples:
477 // - Send welcome email
478 // - Create user profile
479 // - Initialize user preferences
480
481 Ok(())
482 }}
483}}
484
485/// Order placed event handler
486pub struct OrderPlacedHandler;
487
488#[async_trait]
489impl EventHandler<OrderPlaced> for OrderPlacedHandler {{
490 async fn handle(&self, event: EventEnvelope<OrderPlaced>) -> Result<()> {{
491 info!(
492 order_id = %event.payload.order_id,
493 user_id = %event.payload.user_id,
494 total = %event.payload.total,
495 "Processing OrderPlaced event"
496 );
497
498 // TODO: Implement your business logic here
499 // Examples:
500 // - Reserve inventory
501 // - Process payment
502 // - Send confirmation email
503
504 Ok(())
505 }}
506}}
507
508/// Handler registry for routing events to handlers
509pub struct {pascal_name}HandlerRegistry {{
510 user_created_handler: UserCreatedHandler,
511 order_placed_handler: OrderPlacedHandler,
512}}
513
514impl Default for {pascal_name}HandlerRegistry {{
515 fn default() -> Self {{
516 Self::new()
517 }}
518}}
519
520impl {pascal_name}HandlerRegistry {{
521 pub fn new() -> Self {{
522 Self {{
523 user_created_handler: UserCreatedHandler,
524 order_placed_handler: OrderPlacedHandler,
525 }}
526 }}
527
528 /// Route and handle an event based on its type
529 pub async fn handle_event(&self, event_type: &str, payload: &[u8]) -> Result<()> {{
530 match event_type {{
531 "UserCreated" => {{
532 let event: EventEnvelope<UserCreated> = serde_json::from_slice(payload)
533 .map_err(|e| crate::error::{pascal_name}Error::DeserializationError(e.to_string()))?;
534 self.user_created_handler.handle(event).await
535 }}
536 "OrderPlaced" => {{
537 let event: EventEnvelope<OrderPlaced> = serde_json::from_slice(payload)
538 .map_err(|e| crate::error::{pascal_name}Error::DeserializationError(e.to_string()))?;
539 self.order_placed_handler.handle(event).await
540 }}
541 _ => {{
542 warn!(event_type = %event_type, "Unknown event type, skipping");
543 Ok(())
544 }}
545 }}
546 }}
547}}
548"#,
549 pascal_name = pascal_name,
550 )
551}
552
553pub fn application_mod(_config: &ProjectConfig) -> String {
555 r#"//! Application layer - Consumer orchestration
556
557pub mod consumer;
558
559pub use consumer::*;
560"#
561 .to_string()
562}
563
564pub fn application_consumer(config: &ProjectConfig) -> String {
566 let consumer = config.consumer.as_ref().unwrap();
567 let pascal_name = to_pascal_case(&consumer.service_name);
568
569 format!(
570 r#"//! Main consumer implementation
571
572use std::sync::Arc;
573use std::time::Duration;
574use async_trait::async_trait;
575use tracing::{{info, warn, error, instrument}};
576
577use crate::config::{{RetryConfig, DlqConfig}};
578use crate::domain::{pascal_name}HandlerRegistry;
579use crate::error::{{Result, {pascal_name}Error}};
580use crate::infrastructure::{{MessageBroker, IdempotencyStore}};
581
582/// Message from the broker
583#[derive(Debug, Clone)]
584pub struct Message {{
585 /// Message key
586 pub key: Option<String>,
587 /// Message payload
588 pub payload: Vec<u8>,
589 /// Message headers
590 pub headers: Vec<(String, String)>,
591 /// Topic
592 pub topic: String,
593 /// Partition
594 pub partition: i32,
595 /// Offset
596 pub offset: i64,
597}}
598
599impl Message {{
600 /// Get the event ID from headers
601 pub fn event_id(&self) -> Option<String> {{
602 self.headers
603 .iter()
604 .find(|(k, _)| k == "event_id")
605 .map(|(_, v)| v.clone())
606 }}
607
608 /// Get the event type from headers
609 pub fn event_type(&self) -> Option<String> {{
610 self.headers
611 .iter()
612 .find(|(k, _)| k == "event_type")
613 .map(|(_, v)| v.clone())
614 }}
615}}
616
617/// Main consumer service
618pub struct {pascal_name}Consumer<B: MessageBroker, I: IdempotencyStore> {{
619 broker: B,
620 idempotency_store: Arc<I>,
621 handler_registry: {pascal_name}HandlerRegistry,
622 retry_config: RetryConfig,
623 dlq_config: DlqConfig,
624}}
625
626impl<B: MessageBroker, I: IdempotencyStore> {pascal_name}Consumer<B, I> {{
627 pub fn new(
628 broker: B,
629 idempotency_store: Arc<I>,
630 retry_config: RetryConfig,
631 dlq_config: DlqConfig,
632 ) -> Self {{
633 Self {{
634 broker,
635 idempotency_store,
636 handler_registry: {pascal_name}HandlerRegistry::new(),
637 retry_config,
638 dlq_config,
639 }}
640 }}
641
642 /// Run the consumer loop
643 pub async fn run(&self, topics: &[String]) -> Result<()> {{
644 info!("Subscribing to topics: {{:?}}", topics);
645 self.broker.subscribe(topics).await?;
646
647 loop {{
648 match self.broker.poll(Duration::from_secs(1)).await {{
649 Ok(Some(message)) => {{
650 if let Err(e) = self.process_message(message).await {{
651 error!("Error processing message: {{}}", e);
652 }}
653 }}
654 Ok(None) => {{
655 // No message available, continue polling
656 }}
657 Err(e) => {{
658 error!("Error polling for messages: {{}}", e);
659 tokio::time::sleep(Duration::from_secs(1)).await;
660 }}
661 }}
662 }}
663 }}
664
665 #[instrument(skip(self, message), fields(topic = %message.topic, partition = %message.partition, offset = %message.offset))]
666 async fn process_message(&self, message: Message) -> Result<()> {{
667 let event_id = message.event_id().unwrap_or_else(|| {{
668 format!("{{}}:{{}}:{{}}", message.topic, message.partition, message.offset)
669 }});
670
671 // Check idempotency
672 if self.idempotency_store.exists(&event_id).await? {{
673 info!(event_id = %event_id, "Event already processed, skipping");
674 self.broker.commit(&message).await?;
675 return Ok(());
676 }}
677
678 let event_type = message.event_type().unwrap_or_else(|| "Unknown".to_string());
679
680 // Process with retry
681 let result = self.process_with_retry(&event_type, &message.payload).await;
682
683 match result {{
684 Ok(()) => {{
685 // Mark as processed
686 self.idempotency_store.mark_processed(&event_id).await?;
687 self.broker.commit(&message).await?;
688 info!(event_id = %event_id, "Event processed successfully");
689 }}
690 Err(e) => {{
691 error!(event_id = %event_id, error = %e, "Failed to process event");
692
693 if self.dlq_config.enabled {{
694 // Send to DLQ
695 let dlq_topic = format!("{{}}{{}}", message.topic, self.dlq_config.suffix);
696 self.broker.send_to_dlq(&dlq_topic, &message).await?;
697 self.broker.commit(&message).await?;
698 warn!(event_id = %event_id, dlq_topic = %dlq_topic, "Event sent to DLQ");
699 }} else {{
700 return Err(e);
701 }}
702 }}
703 }}
704
705 Ok(())
706 }}
707
708 async fn process_with_retry(&self, event_type: &str, payload: &[u8]) -> Result<()> {{
709 let mut attempts = 0;
710 let mut backoff = self.retry_config.initial_backoff;
711
712 loop {{
713 attempts += 1;
714
715 match self.handler_registry.handle_event(event_type, payload).await {{
716 Ok(()) => return Ok(()),
717 Err(e) if attempts >= self.retry_config.max_attempts => {{
718 return Err(e);
719 }}
720 Err(e) => {{
721 warn!(
722 attempt = attempts,
723 max_attempts = self.retry_config.max_attempts,
724 error = %e,
725 "Handler failed, retrying"
726 );
727
728 tokio::time::sleep(backoff).await;
729 backoff = std::cmp::min(
730 Duration::from_secs_f64(backoff.as_secs_f64() * self.retry_config.multiplier),
731 self.retry_config.max_backoff,
732 );
733 }}
734 }}
735 }}
736 }}
737}}
738"#,
739 pascal_name = pascal_name,
740 )
741}
742
743pub fn infrastructure_mod(_config: &ProjectConfig) -> String {
745 r#"//! Infrastructure layer - Message brokers and stores
746
747mod broker;
748mod idempotency;
749mod health;
750
751pub use broker::*;
752pub use idempotency::*;
753pub use health::*;
754"#
755 .to_string()
756}
757
758pub fn infrastructure_broker(config: &ProjectConfig) -> String {
760 let consumer = config.consumer.as_ref().unwrap();
761 let pascal_name = to_pascal_case(&consumer.service_name);
762
763 format!(
764 r#"//! Message broker implementations
765
766use std::time::Duration;
767use async_trait::async_trait;
768use tracing::{{info, debug}};
769
770use crate::config::BrokerConfig;
771use crate::application::Message;
772use crate::error::{{Result, {pascal_name}Error}};
773
774/// Message broker trait
775#[async_trait]
776pub trait MessageBroker: Send + Sync {{
777 /// Subscribe to topics
778 async fn subscribe(&self, topics: &[String]) -> Result<()>;
779
780 /// Poll for messages
781 async fn poll(&self, timeout: Duration) -> Result<Option<Message>>;
782
783 /// Commit message offset
784 async fn commit(&self, message: &Message) -> Result<()>;
785
786 /// Send message to DLQ
787 async fn send_to_dlq(&self, dlq_topic: &str, message: &Message) -> Result<()>;
788}}
789
790/// Kafka message broker implementation
791pub struct KafkaMessageBroker {{
792 // In a real implementation, this would hold the rdkafka consumer
793 _config: BrokerConfig,
794}}
795
796impl KafkaMessageBroker {{
797 pub async fn new(config: &BrokerConfig) -> Result<Self> {{
798 info!("Connecting to Kafka brokers: {{}}", config.brokers);
799
800 // In a real implementation:
801 // let consumer: StreamConsumer = ClientConfig::new()
802 // .set("bootstrap.servers", &config.brokers)
803 // .set("group.id", &group_id)
804 // .set("enable.partition.eof", "false")
805 // .set("session.timeout.ms", "6000")
806 // .set("enable.auto.commit", "false")
807 // .create()?;
808
809 Ok(Self {{
810 _config: config.clone(),
811 }})
812 }}
813}}
814
815#[async_trait]
816impl MessageBroker for KafkaMessageBroker {{
817 async fn subscribe(&self, topics: &[String]) -> Result<()> {{
818 info!("Subscribing to topics: {{:?}}", topics);
819 // In a real implementation:
820 // self.consumer.subscribe(&topics.iter().map(|s| s.as_str()).collect::<Vec<_>>())?;
821 Ok(())
822 }}
823
824 async fn poll(&self, timeout: Duration) -> Result<Option<Message>> {{
825 debug!("Polling for messages with timeout: {{:?}}", timeout);
826
827 // In a real implementation:
828 // match self.consumer.poll(timeout) {{
829 // Some(Ok(msg)) => {{
830 // let headers = msg.headers()
831 // .map(|h| {{
832 // (0..h.count())
833 // .filter_map(|i| {{
834 // h.get(i).map(|header| {{
835 // (header.key.to_string(), String::from_utf8_lossy(header.value.unwrap_or_default()).to_string())
836 // }})
837 // }})
838 // .collect()
839 // }})
840 // .unwrap_or_default();
841 //
842 // Ok(Some(Message {{
843 // key: msg.key().map(|k| String::from_utf8_lossy(k).to_string()),
844 // payload: msg.payload().unwrap_or_default().to_vec(),
845 // headers,
846 // topic: msg.topic().to_string(),
847 // partition: msg.partition(),
848 // offset: msg.offset(),
849 // }}))
850 // }}
851 // Some(Err(e)) => Err({pascal_name}Error::BrokerError(e.to_string())),
852 // None => Ok(None),
853 // }}
854
855 // Placeholder: simulate no messages
856 tokio::time::sleep(timeout).await;
857 Ok(None)
858 }}
859
860 async fn commit(&self, message: &Message) -> Result<()> {{
861 debug!(
862 topic = %message.topic,
863 partition = %message.partition,
864 offset = %message.offset,
865 "Committing offset"
866 );
867 // In a real implementation:
868 // self.consumer.commit_message(&message, CommitMode::Async)?;
869 Ok(())
870 }}
871
872 async fn send_to_dlq(&self, dlq_topic: &str, message: &Message) -> Result<()> {{
873 info!(
874 dlq_topic = %dlq_topic,
875 original_topic = %message.topic,
876 "Sending message to DLQ"
877 );
878 // In a real implementation, use a producer to send to DLQ
879 Ok(())
880 }}
881}}
882"#,
883 pascal_name = pascal_name,
884 )
885}
886
887pub fn infrastructure_idempotency(config: &ProjectConfig) -> String {
889 let consumer = config.consumer.as_ref().unwrap();
890 let pascal_name = to_pascal_case(&consumer.service_name);
891
892 format!(
893 r#"//! Idempotency store implementations
894
895use std::collections::HashSet;
896use std::sync::RwLock;
897use async_trait::async_trait;
898
899use crate::error::{{Result, {pascal_name}Error}};
900
901/// Idempotency store trait
902#[async_trait]
903pub trait IdempotencyStore: Send + Sync {{
904 /// Check if an event has already been processed
905 async fn exists(&self, event_id: &str) -> Result<bool>;
906
907 /// Mark an event as processed
908 async fn mark_processed(&self, event_id: &str) -> Result<()>;
909}}
910
911/// In-memory idempotency store (for development/testing)
912pub struct InMemoryIdempotencyStore {{
913 processed: RwLock<HashSet<String>>,
914}}
915
916impl InMemoryIdempotencyStore {{
917 pub fn new() -> Self {{
918 Self {{
919 processed: RwLock::new(HashSet::new()),
920 }}
921 }}
922}}
923
924impl Default for InMemoryIdempotencyStore {{
925 fn default() -> Self {{
926 Self::new()
927 }}
928}}
929
930#[async_trait]
931impl IdempotencyStore for InMemoryIdempotencyStore {{
932 async fn exists(&self, event_id: &str) -> Result<bool> {{
933 let guard = self.processed.read()
934 .map_err(|e| {pascal_name}Error::IdempotencyError(e.to_string()))?;
935 Ok(guard.contains(event_id))
936 }}
937
938 async fn mark_processed(&self, event_id: &str) -> Result<()> {{
939 let mut guard = self.processed.write()
940 .map_err(|e| {pascal_name}Error::IdempotencyError(e.to_string()))?;
941 guard.insert(event_id.to_string());
942 Ok(())
943 }}
944}}
945
946// TODO: Add Redis implementation
947// pub struct RedisIdempotencyStore {{ ... }}
948
949// TODO: Add PostgreSQL implementation
950// pub struct PostgresIdempotencyStore {{ ... }}
951"#,
952 pascal_name = pascal_name,
953 )
954}
955
956pub fn infrastructure_health(_config: &ProjectConfig) -> String {
958 r#"//! Health check server
959
960use std::net::SocketAddr;
961use axum::{routing::get, Router, Json};
962use serde::Serialize;
963use tracing::info;
964
965/// Health response
966#[derive(Serialize)]
967pub struct HealthResponse {
968 pub status: String,
969 pub service: String,
970}
971
972/// Health check server
973pub struct HealthServer {
974 port: u16,
975}
976
977impl HealthServer {
978 pub fn new(port: u16) -> Self {
979 Self { port }
980 }
981
982 pub async fn run(&self) -> anyhow::Result<()> {
983 let app = Router::new()
984 .route("/health", get(health_handler))
985 .route("/ready", get(ready_handler));
986
987 let addr: SocketAddr = ([0, 0, 0, 0], self.port).into();
988 info!("Health server listening on {}", addr);
989
990 let listener = tokio::net::TcpListener::bind(addr).await?;
991 axum::serve(listener, app).await?;
992
993 Ok(())
994 }
995}
996
997async fn health_handler() -> Json<HealthResponse> {
998 Json(HealthResponse {
999 status: "healthy".to_string(),
1000 service: env!("CARGO_PKG_NAME").to_string(),
1001 })
1002}
1003
1004async fn ready_handler() -> Json<HealthResponse> {
1005 // TODO: Check actual readiness (broker connection, etc.)
1006 Json(HealthResponse {
1007 status: "ready".to_string(),
1008 service: env!("CARGO_PKG_NAME").to_string(),
1009 })
1010}
1011"#
1012 .to_string()
1013}
1014
1015pub fn readme(config: &ProjectConfig) -> String {
1017 let consumer = config.consumer.as_ref().unwrap();
1018
1019 format!(
1020 r#"# {display_name}
1021
1022An event consumer service built with AllFrame.
1023
1024## Features
1025
1026- **{broker} Consumer**: Event-driven message processing
1027- **Idempotency**: Duplicate event detection
1028- **Dead Letter Queue**: Failed message handling
1029- **Retry with Backoff**: Configurable retry strategy
1030- **Health Checks**: Kubernetes-ready endpoints
1031
1032## Configuration
1033
1034Set the following environment variables:
1035
1036```bash
1037# Broker configuration
1038{upper_name}_BROKERS=localhost:9092
1039{upper_name}_GROUP_ID={service_name}-group
1040{upper_name}_TOPICS=events
1041
1042# Retry configuration
1043{upper_name}_RETRY_MAX_ATTEMPTS=3
1044{upper_name}_RETRY_INITIAL_BACKOFF_MS=100
1045{upper_name}_RETRY_MAX_BACKOFF_MS=10000
1046
1047# DLQ configuration
1048{upper_name}_DLQ_ENABLED=true
1049{upper_name}_DLQ_SUFFIX=.dlq
1050
1051# Server configuration
1052{upper_name}_HEALTH_PORT=8081
1053{upper_name}_METRICS_PORT=9090
1054```
1055
1056## Running
1057
1058```bash
1059cargo run
1060```
1061
1062## Health Endpoints
1063
1064- `GET /health` - Liveness check
1065- `GET /ready` - Readiness check
1066
1067## Adding Event Handlers
1068
10691. Define your event type in `src/domain/events.rs`
10702. Create a handler in `src/domain/handlers.rs`
10713. Register it in the `HandlerRegistry`
1072
1073## Generated by AllFrame Ignite
1074"#,
1075 display_name = consumer.display_name,
1076 broker = consumer.broker,
1077 service_name = consumer.service_name,
1078 upper_name = consumer.service_name.to_uppercase().replace('-', "_"),
1079 )
1080}
1081
1082pub fn dockerfile(config: &ProjectConfig) -> String {
1084 let name = &config.name;
1085
1086 format!(
1087 r#"# Build stage
1088FROM rust:1.86-slim as builder
1089
1090WORKDIR /app
1091COPY . .
1092RUN cargo build --release
1093
1094# Runtime stage
1095FROM debian:bookworm-slim
1096
1097RUN apt-get update && apt-get install -y \
1098 ca-certificates \
1099 libssl3 \
1100 && rm -rf /var/lib/apt/lists/*
1101
1102WORKDIR /app
1103COPY --from=builder /app/target/release/{name} /app/{name}
1104
1105ENV RUST_LOG=info
1106
1107EXPOSE 8081
1108
1109CMD ["/app/{name}"]
1110"#,
1111 name = name,
1112 )
1113}
1114
1115#[cfg(test)]
1116mod tests {
1117 use super::*;
1118
1119 #[test]
1120 fn test_to_pascal_case() {
1121 assert_eq!(to_pascal_case("order-processor"), "OrderProcessor");
1122 assert_eq!(to_pascal_case("user_handler"), "UserHandler");
1123 assert_eq!(to_pascal_case("simple"), "Simple");
1124 }
1125}