1use crate::config::{MessageBroker, ProjectConfig};
7
8fn to_pascal_case(s: &str) -> String {
10 s.split(|c| c == '-' || c == '_')
11 .map(|word| {
12 let mut chars = word.chars();
13 match chars.next() {
14 None => String::new(),
15 Some(first) => first.to_uppercase().chain(chars).collect(),
16 }
17 })
18 .collect()
19}
20
21pub fn cargo_toml(config: &ProjectConfig) -> String {
23 let consumer = config.consumer.as_ref().unwrap();
24 let name = &config.name;
25
26 let broker_deps = match consumer.broker {
27 MessageBroker::Kafka => r#"rdkafka = { version = "0.36", features = ["cmake-build"] }"#,
28 MessageBroker::RabbitMq => r#"lapin = "2.3""#,
29 MessageBroker::Redis => {
30 r#"redis = { version = "0.25", features = ["tokio-comp", "streams"] }"#
31 }
32 MessageBroker::Sqs => r#"aws-sdk-sqs = "1.0""#,
33 MessageBroker::PubSub => r#"google-cloud-pubsub = "0.25""#,
34 };
35
36 format!(
37 r#"[package]
38name = "{name}"
39version = "0.1.0"
40edition = "2021"
41rust-version = "1.75"
42description = "{display_name}"
43
44[dependencies]
45# AllFrame
46allframe-core = {{ version = "0.1", features = ["resilience", "otel"] }}
47
48# Message Broker
49{broker_deps}
50
51# Async
52tokio = {{ version = "1", features = ["full"] }}
53async-trait = "0.1"
54futures = "0.3"
55
56# Serialization
57serde = {{ version = "1.0", features = ["derive"] }}
58serde_json = "1.0"
59
60# Error handling
61thiserror = "2.0"
62anyhow = "1.0"
63
64# Tracing & Metrics
65tracing = "0.1"
66tracing-subscriber = {{ version = "0.3", features = ["env-filter"] }}
67opentelemetry = {{ version = "0.27", features = ["metrics"] }}
68
69# Utilities
70chrono = {{ version = "0.4", features = ["serde"] }}
71uuid = {{ version = "1.0", features = ["v4", "serde"] }}
72dotenvy = "0.15"
73
74# Health checks
75axum = "0.7"
76
77[dev-dependencies]
78tokio-test = "0.4"
79mockall = "0.13"
80
81[[bin]]
82name = "{name}"
83path = "src/main.rs"
84"#,
85 name = name,
86 display_name = consumer.display_name,
87 broker_deps = broker_deps,
88 )
89}
90
91pub fn main_rs(config: &ProjectConfig) -> String {
93 let consumer = config.consumer.as_ref().unwrap();
94 let pascal_name = to_pascal_case(&consumer.service_name);
95
96 format!(
97 r#"//! {display_name}
98//!
99//! An event consumer service with idempotency, retry, and dead letter queue handling.
100
101use std::sync::Arc;
102use tracing::info;
103
104mod config;
105mod error;
106mod domain;
107mod application;
108mod infrastructure;
109
110use config::Config;
111use application::{pascal_name}Consumer;
112use infrastructure::{{
113 KafkaMessageBroker,
114 InMemoryIdempotencyStore,
115 HealthServer,
116}};
117
118#[tokio::main]
119async fn main() -> anyhow::Result<()> {{
120 // Load environment variables
121 dotenvy::dotenv().ok();
122
123 // Initialize tracing
124 tracing_subscriber::fmt()
125 .with_env_filter(
126 tracing_subscriber::EnvFilter::from_default_env()
127 .add_directive(tracing::Level::INFO.into()),
128 )
129 .init();
130
131 // Load configuration
132 let config = Config::from_env();
133 info!("Starting {display_name}");
134 info!("Broker: {{}}", config.broker.brokers);
135 info!("Topics: {{:?}}", config.topics);
136
137 // Create idempotency store
138 let idempotency_store = Arc::new(InMemoryIdempotencyStore::new());
139
140 // Create message broker
141 let broker = KafkaMessageBroker::new(&config.broker).await?;
142
143 // Create consumer
144 let consumer = {pascal_name}Consumer::new(
145 broker,
146 idempotency_store,
147 config.retry.clone(),
148 config.dlq.clone(),
149 );
150
151 // Start health server in background
152 let health_handle = tokio::spawn(async move {{
153 let health_server = HealthServer::new(config.server.health_port);
154 health_server.run().await
155 }});
156
157 // Run consumer
158 info!("Consumer started, waiting for messages...");
159 consumer.run(&config.topics).await?;
160
161 health_handle.abort();
162 info!("Consumer shutdown complete");
163 Ok(())
164}}
165"#,
166 display_name = consumer.display_name,
167 pascal_name = pascal_name,
168 )
169}
170
171pub fn config_rs(config: &ProjectConfig) -> String {
173 let consumer = config.consumer.as_ref().unwrap();
174 let service_name = &consumer.service_name;
175 let upper_name = service_name.to_uppercase().replace('-', "_");
176
177 format!(
178 r#"//! Configuration module
179//!
180//! Loads configuration from environment variables.
181
182use std::time::Duration;
183
184/// Main configuration
185#[derive(Debug, Clone)]
186pub struct Config {{
187 pub broker: BrokerConfig,
188 pub topics: Vec<String>,
189 pub group_id: String,
190 pub retry: RetryConfig,
191 pub dlq: DlqConfig,
192 pub server: ServerConfig,
193}}
194
195/// Message broker configuration
196#[derive(Debug, Clone)]
197pub struct BrokerConfig {{
198 pub brokers: String,
199 pub security_protocol: String,
200 pub sasl_mechanism: Option<String>,
201 pub sasl_username: Option<String>,
202 pub sasl_password: Option<String>,
203}}
204
205/// Retry configuration
206#[derive(Debug, Clone)]
207pub struct RetryConfig {{
208 pub max_attempts: u32,
209 pub initial_backoff: Duration,
210 pub max_backoff: Duration,
211 pub multiplier: f64,
212}}
213
214/// Dead Letter Queue configuration
215#[derive(Debug, Clone)]
216pub struct DlqConfig {{
217 pub enabled: bool,
218 pub suffix: String,
219}}
220
221/// Server configuration
222#[derive(Debug, Clone)]
223pub struct ServerConfig {{
224 pub health_port: u16,
225 pub metrics_port: u16,
226}}
227
228impl Config {{
229 pub fn from_env() -> Self {{
230 Self {{
231 broker: BrokerConfig {{
232 brokers: std::env::var("{upper_name}_BROKERS")
233 .unwrap_or_else(|_| "localhost:9092".to_string()),
234 security_protocol: std::env::var("{upper_name}_SECURITY_PROTOCOL")
235 .unwrap_or_else(|_| "PLAINTEXT".to_string()),
236 sasl_mechanism: std::env::var("{upper_name}_SASL_MECHANISM").ok(),
237 sasl_username: std::env::var("{upper_name}_SASL_USERNAME").ok(),
238 sasl_password: std::env::var("{upper_name}_SASL_PASSWORD").ok(),
239 }},
240 topics: std::env::var("{upper_name}_TOPICS")
241 .unwrap_or_else(|_| "events".to_string())
242 .split(',')
243 .map(|s| s.trim().to_string())
244 .collect(),
245 group_id: std::env::var("{upper_name}_GROUP_ID")
246 .unwrap_or_else(|_| "{service_name}-group".to_string()),
247 retry: RetryConfig {{
248 max_attempts: std::env::var("{upper_name}_RETRY_MAX_ATTEMPTS")
249 .ok()
250 .and_then(|v| v.parse().ok())
251 .unwrap_or({max_attempts}),
252 initial_backoff: Duration::from_millis(
253 std::env::var("{upper_name}_RETRY_INITIAL_BACKOFF_MS")
254 .ok()
255 .and_then(|v| v.parse().ok())
256 .unwrap_or({initial_backoff_ms}),
257 ),
258 max_backoff: Duration::from_millis(
259 std::env::var("{upper_name}_RETRY_MAX_BACKOFF_MS")
260 .ok()
261 .and_then(|v| v.parse().ok())
262 .unwrap_or({max_backoff_ms}),
263 ),
264 multiplier: std::env::var("{upper_name}_RETRY_MULTIPLIER")
265 .ok()
266 .and_then(|v| v.parse().ok())
267 .unwrap_or({multiplier}),
268 }},
269 dlq: DlqConfig {{
270 enabled: std::env::var("{upper_name}_DLQ_ENABLED")
271 .ok()
272 .and_then(|v| v.parse().ok())
273 .unwrap_or(true),
274 suffix: std::env::var("{upper_name}_DLQ_SUFFIX")
275 .unwrap_or_else(|_| ".dlq".to_string()),
276 }},
277 server: ServerConfig {{
278 health_port: std::env::var("{upper_name}_HEALTH_PORT")
279 .ok()
280 .and_then(|v| v.parse().ok())
281 .unwrap_or({health_port}),
282 metrics_port: std::env::var("{upper_name}_METRICS_PORT")
283 .ok()
284 .and_then(|v| v.parse().ok())
285 .unwrap_or({metrics_port}),
286 }},
287 }}
288 }}
289}}
290"#,
291 upper_name = upper_name,
292 service_name = service_name,
293 max_attempts = consumer.retry.max_attempts,
294 initial_backoff_ms = consumer.retry.initial_backoff_ms,
295 max_backoff_ms = consumer.retry.max_backoff_ms,
296 multiplier = consumer.retry.multiplier,
297 health_port = consumer.server.health_port,
298 metrics_port = consumer.server.metrics_port,
299 )
300}
301
302pub fn error_rs(config: &ProjectConfig) -> String {
304 let consumer = config.consumer.as_ref().unwrap();
305 let pascal_name = to_pascal_case(&consumer.service_name);
306
307 format!(
308 r#"//! Error types for the consumer service
309
310use thiserror::Error;
311
312/// Consumer error type
313#[derive(Error, Debug)]
314pub enum {pascal_name}Error {{
315 #[error("Broker error: {{0}}")]
316 BrokerError(String),
317
318 #[error("Deserialization error: {{0}}")]
319 DeserializationError(String),
320
321 #[error("Handler error: {{0}}")]
322 HandlerError(String),
323
324 #[error("Idempotency error: {{0}}")]
325 IdempotencyError(String),
326
327 #[error("DLQ error: {{0}}")]
328 DlqError(String),
329
330 #[error("Configuration error: {{0}}")]
331 ConfigError(String),
332}}
333
334/// Result type alias
335pub type Result<T> = std::result::Result<T, {pascal_name}Error>;
336
337impl From<{pascal_name}Error> for tonic::Status {{
338 fn from(e: {pascal_name}Error) -> Self {{
339 tonic::Status::internal(e.to_string())
340 }}
341}}
342"#,
343 pascal_name = pascal_name,
344 )
345}
346
347pub fn domain_mod(_config: &ProjectConfig) -> String {
349 r#"//! Domain layer - Events and handlers
350
351pub mod events;
352pub mod handlers;
353
354pub use events::*;
355pub use handlers::*;
356"#
357 .to_string()
358}
359
360pub fn domain_events(_config: &ProjectConfig) -> String {
362 r#"//! Domain events
363
364use serde::{Deserialize, Serialize};
365use chrono::{DateTime, Utc};
366use uuid::Uuid;
367
368/// Base event envelope
369#[derive(Debug, Clone, Serialize, Deserialize)]
370pub struct EventEnvelope<T> {
371 /// Unique event ID (used for idempotency)
372 pub id: Uuid,
373 /// Event type name
374 pub event_type: String,
375 /// Event timestamp
376 pub timestamp: DateTime<Utc>,
377 /// Correlation ID for tracing
378 pub correlation_id: Option<Uuid>,
379 /// Event payload
380 pub payload: T,
381 /// Event metadata
382 pub metadata: EventMetadata,
383}
384
385/// Event metadata
386#[derive(Debug, Clone, Default, Serialize, Deserialize)]
387pub struct EventMetadata {
388 /// Source service
389 pub source: Option<String>,
390 /// Event version
391 pub version: Option<String>,
392 /// Retry count
393 pub retry_count: u32,
394}
395
396impl<T> EventEnvelope<T> {
397 /// Create a new event envelope
398 pub fn new(event_type: impl Into<String>, payload: T) -> Self {
399 Self {
400 id: Uuid::new_v4(),
401 event_type: event_type.into(),
402 timestamp: Utc::now(),
403 correlation_id: None,
404 payload,
405 metadata: EventMetadata::default(),
406 }
407 }
408
409 /// Set correlation ID
410 pub fn with_correlation_id(mut self, id: Uuid) -> Self {
411 self.correlation_id = Some(id);
412 self
413 }
414}
415
416/// Example domain event: User Created
417#[derive(Debug, Clone, Serialize, Deserialize)]
418pub struct UserCreated {
419 pub user_id: Uuid,
420 pub email: String,
421 pub name: String,
422}
423
424/// Example domain event: Order Placed
425#[derive(Debug, Clone, Serialize, Deserialize)]
426pub struct OrderPlaced {
427 pub order_id: Uuid,
428 pub user_id: Uuid,
429 pub total: f64,
430 pub items: Vec<OrderItem>,
431}
432
433/// Order item
434#[derive(Debug, Clone, Serialize, Deserialize)]
435pub struct OrderItem {
436 pub product_id: Uuid,
437 pub quantity: u32,
438 pub price: f64,
439}
440"#
441 .to_string()
442}
443
444pub fn domain_handlers(config: &ProjectConfig) -> String {
446 let consumer = config.consumer.as_ref().unwrap();
447 let pascal_name = to_pascal_case(&consumer.service_name);
448
449 format!(
450 r#"//! Event handlers
451
452use async_trait::async_trait;
453use tracing::{{info, warn}};
454
455use crate::domain::events::*;
456use crate::error::Result;
457
458/// Event handler trait
459#[async_trait]
460pub trait EventHandler<E>: Send + Sync {{
461 /// Handle an event
462 async fn handle(&self, event: EventEnvelope<E>) -> Result<()>;
463}}
464
465/// User created event handler
466pub struct UserCreatedHandler;
467
468#[async_trait]
469impl EventHandler<UserCreated> for UserCreatedHandler {{
470 async fn handle(&self, event: EventEnvelope<UserCreated>) -> Result<()> {{
471 info!(
472 user_id = %event.payload.user_id,
473 email = %event.payload.email,
474 "Processing UserCreated event"
475 );
476
477 // TODO: Implement your business logic here
478 // Examples:
479 // - Send welcome email
480 // - Create user profile
481 // - Initialize user preferences
482
483 Ok(())
484 }}
485}}
486
487/// Order placed event handler
488pub struct OrderPlacedHandler;
489
490#[async_trait]
491impl EventHandler<OrderPlaced> for OrderPlacedHandler {{
492 async fn handle(&self, event: EventEnvelope<OrderPlaced>) -> Result<()> {{
493 info!(
494 order_id = %event.payload.order_id,
495 user_id = %event.payload.user_id,
496 total = %event.payload.total,
497 "Processing OrderPlaced event"
498 );
499
500 // TODO: Implement your business logic here
501 // Examples:
502 // - Reserve inventory
503 // - Process payment
504 // - Send confirmation email
505
506 Ok(())
507 }}
508}}
509
510/// Handler registry for routing events to handlers
511pub struct {pascal_name}HandlerRegistry {{
512 user_created_handler: UserCreatedHandler,
513 order_placed_handler: OrderPlacedHandler,
514}}
515
516impl Default for {pascal_name}HandlerRegistry {{
517 fn default() -> Self {{
518 Self::new()
519 }}
520}}
521
522impl {pascal_name}HandlerRegistry {{
523 pub fn new() -> Self {{
524 Self {{
525 user_created_handler: UserCreatedHandler,
526 order_placed_handler: OrderPlacedHandler,
527 }}
528 }}
529
530 /// Route and handle an event based on its type
531 pub async fn handle_event(&self, event_type: &str, payload: &[u8]) -> Result<()> {{
532 match event_type {{
533 "UserCreated" => {{
534 let event: EventEnvelope<UserCreated> = serde_json::from_slice(payload)
535 .map_err(|e| crate::error::{pascal_name}Error::DeserializationError(e.to_string()))?;
536 self.user_created_handler.handle(event).await
537 }}
538 "OrderPlaced" => {{
539 let event: EventEnvelope<OrderPlaced> = serde_json::from_slice(payload)
540 .map_err(|e| crate::error::{pascal_name}Error::DeserializationError(e.to_string()))?;
541 self.order_placed_handler.handle(event).await
542 }}
543 _ => {{
544 warn!(event_type = %event_type, "Unknown event type, skipping");
545 Ok(())
546 }}
547 }}
548 }}
549}}
550"#,
551 pascal_name = pascal_name,
552 )
553}
554
555pub fn application_mod(_config: &ProjectConfig) -> String {
557 r#"//! Application layer - Consumer orchestration
558
559pub mod consumer;
560
561pub use consumer::*;
562"#
563 .to_string()
564}
565
566pub fn application_consumer(config: &ProjectConfig) -> String {
568 let consumer = config.consumer.as_ref().unwrap();
569 let pascal_name = to_pascal_case(&consumer.service_name);
570
571 format!(
572 r#"//! Main consumer implementation
573
574use std::sync::Arc;
575use std::time::Duration;
576use async_trait::async_trait;
577use tracing::{{info, warn, error, instrument}};
578
579use crate::config::{{RetryConfig, DlqConfig}};
580use crate::domain::{pascal_name}HandlerRegistry;
581use crate::error::{{Result, {pascal_name}Error}};
582use crate::infrastructure::{{MessageBroker, IdempotencyStore}};
583
584/// Message from the broker
585#[derive(Debug, Clone)]
586pub struct Message {{
587 /// Message key
588 pub key: Option<String>,
589 /// Message payload
590 pub payload: Vec<u8>,
591 /// Message headers
592 pub headers: Vec<(String, String)>,
593 /// Topic
594 pub topic: String,
595 /// Partition
596 pub partition: i32,
597 /// Offset
598 pub offset: i64,
599}}
600
601impl Message {{
602 /// Get the event ID from headers
603 pub fn event_id(&self) -> Option<String> {{
604 self.headers
605 .iter()
606 .find(|(k, _)| k == "event_id")
607 .map(|(_, v)| v.clone())
608 }}
609
610 /// Get the event type from headers
611 pub fn event_type(&self) -> Option<String> {{
612 self.headers
613 .iter()
614 .find(|(k, _)| k == "event_type")
615 .map(|(_, v)| v.clone())
616 }}
617}}
618
619/// Main consumer service
620pub struct {pascal_name}Consumer<B: MessageBroker, I: IdempotencyStore> {{
621 broker: B,
622 idempotency_store: Arc<I>,
623 handler_registry: {pascal_name}HandlerRegistry,
624 retry_config: RetryConfig,
625 dlq_config: DlqConfig,
626}}
627
628impl<B: MessageBroker, I: IdempotencyStore> {pascal_name}Consumer<B, I> {{
629 pub fn new(
630 broker: B,
631 idempotency_store: Arc<I>,
632 retry_config: RetryConfig,
633 dlq_config: DlqConfig,
634 ) -> Self {{
635 Self {{
636 broker,
637 idempotency_store,
638 handler_registry: {pascal_name}HandlerRegistry::new(),
639 retry_config,
640 dlq_config,
641 }}
642 }}
643
644 /// Run the consumer loop
645 pub async fn run(&self, topics: &[String]) -> Result<()> {{
646 info!("Subscribing to topics: {{:?}}", topics);
647 self.broker.subscribe(topics).await?;
648
649 loop {{
650 match self.broker.poll(Duration::from_secs(1)).await {{
651 Ok(Some(message)) => {{
652 if let Err(e) = self.process_message(message).await {{
653 error!("Error processing message: {{}}", e);
654 }}
655 }}
656 Ok(None) => {{
657 // No message available, continue polling
658 }}
659 Err(e) => {{
660 error!("Error polling for messages: {{}}", e);
661 tokio::time::sleep(Duration::from_secs(1)).await;
662 }}
663 }}
664 }}
665 }}
666
667 #[instrument(skip(self, message), fields(topic = %message.topic, partition = %message.partition, offset = %message.offset))]
668 async fn process_message(&self, message: Message) -> Result<()> {{
669 let event_id = message.event_id().unwrap_or_else(|| {{
670 format!("{{}}:{{}}:{{}}", message.topic, message.partition, message.offset)
671 }});
672
673 // Check idempotency
674 if self.idempotency_store.exists(&event_id).await? {{
675 info!(event_id = %event_id, "Event already processed, skipping");
676 self.broker.commit(&message).await?;
677 return Ok(());
678 }}
679
680 let event_type = message.event_type().unwrap_or_else(|| "Unknown".to_string());
681
682 // Process with retry
683 let result = self.process_with_retry(&event_type, &message.payload).await;
684
685 match result {{
686 Ok(()) => {{
687 // Mark as processed
688 self.idempotency_store.mark_processed(&event_id).await?;
689 self.broker.commit(&message).await?;
690 info!(event_id = %event_id, "Event processed successfully");
691 }}
692 Err(e) => {{
693 error!(event_id = %event_id, error = %e, "Failed to process event");
694
695 if self.dlq_config.enabled {{
696 // Send to DLQ
697 let dlq_topic = format!("{{}}{{}}", message.topic, self.dlq_config.suffix);
698 self.broker.send_to_dlq(&dlq_topic, &message).await?;
699 self.broker.commit(&message).await?;
700 warn!(event_id = %event_id, dlq_topic = %dlq_topic, "Event sent to DLQ");
701 }} else {{
702 return Err(e);
703 }}
704 }}
705 }}
706
707 Ok(())
708 }}
709
710 async fn process_with_retry(&self, event_type: &str, payload: &[u8]) -> Result<()> {{
711 let mut attempts = 0;
712 let mut backoff = self.retry_config.initial_backoff;
713
714 loop {{
715 attempts += 1;
716
717 match self.handler_registry.handle_event(event_type, payload).await {{
718 Ok(()) => return Ok(()),
719 Err(e) if attempts >= self.retry_config.max_attempts => {{
720 return Err(e);
721 }}
722 Err(e) => {{
723 warn!(
724 attempt = attempts,
725 max_attempts = self.retry_config.max_attempts,
726 error = %e,
727 "Handler failed, retrying"
728 );
729
730 tokio::time::sleep(backoff).await;
731 backoff = std::cmp::min(
732 Duration::from_secs_f64(backoff.as_secs_f64() * self.retry_config.multiplier),
733 self.retry_config.max_backoff,
734 );
735 }}
736 }}
737 }}
738 }}
739}}
740"#,
741 pascal_name = pascal_name,
742 )
743}
744
745pub fn infrastructure_mod(_config: &ProjectConfig) -> String {
747 r#"//! Infrastructure layer - Message brokers and stores
748
749mod broker;
750mod idempotency;
751mod health;
752
753pub use broker::*;
754pub use idempotency::*;
755pub use health::*;
756"#
757 .to_string()
758}
759
760pub fn infrastructure_broker(config: &ProjectConfig) -> String {
762 let consumer = config.consumer.as_ref().unwrap();
763 let pascal_name = to_pascal_case(&consumer.service_name);
764
765 format!(
766 r#"//! Message broker implementations
767
768use std::time::Duration;
769use async_trait::async_trait;
770use tracing::{{info, debug}};
771
772use crate::config::BrokerConfig;
773use crate::application::Message;
774use crate::error::{{Result, {pascal_name}Error}};
775
776/// Message broker trait
777#[async_trait]
778pub trait MessageBroker: Send + Sync {{
779 /// Subscribe to topics
780 async fn subscribe(&self, topics: &[String]) -> Result<()>;
781
782 /// Poll for messages
783 async fn poll(&self, timeout: Duration) -> Result<Option<Message>>;
784
785 /// Commit message offset
786 async fn commit(&self, message: &Message) -> Result<()>;
787
788 /// Send message to DLQ
789 async fn send_to_dlq(&self, dlq_topic: &str, message: &Message) -> Result<()>;
790}}
791
792/// Kafka message broker implementation
793pub struct KafkaMessageBroker {{
794 // In a real implementation, this would hold the rdkafka consumer
795 _config: BrokerConfig,
796}}
797
798impl KafkaMessageBroker {{
799 pub async fn new(config: &BrokerConfig) -> Result<Self> {{
800 info!("Connecting to Kafka brokers: {{}}", config.brokers);
801
802 // In a real implementation:
803 // let consumer: StreamConsumer = ClientConfig::new()
804 // .set("bootstrap.servers", &config.brokers)
805 // .set("group.id", &group_id)
806 // .set("enable.partition.eof", "false")
807 // .set("session.timeout.ms", "6000")
808 // .set("enable.auto.commit", "false")
809 // .create()?;
810
811 Ok(Self {{
812 _config: config.clone(),
813 }})
814 }}
815}}
816
817#[async_trait]
818impl MessageBroker for KafkaMessageBroker {{
819 async fn subscribe(&self, topics: &[String]) -> Result<()> {{
820 info!("Subscribing to topics: {{:?}}", topics);
821 // In a real implementation:
822 // self.consumer.subscribe(&topics.iter().map(|s| s.as_str()).collect::<Vec<_>>())?;
823 Ok(())
824 }}
825
826 async fn poll(&self, timeout: Duration) -> Result<Option<Message>> {{
827 debug!("Polling for messages with timeout: {{:?}}", timeout);
828
829 // In a real implementation:
830 // match self.consumer.poll(timeout) {{
831 // Some(Ok(msg)) => {{
832 // let headers = msg.headers()
833 // .map(|h| {{
834 // (0..h.count())
835 // .filter_map(|i| {{
836 // h.get(i).map(|header| {{
837 // (header.key.to_string(), String::from_utf8_lossy(header.value.unwrap_or_default()).to_string())
838 // }})
839 // }})
840 // .collect()
841 // }})
842 // .unwrap_or_default();
843 //
844 // Ok(Some(Message {{
845 // key: msg.key().map(|k| String::from_utf8_lossy(k).to_string()),
846 // payload: msg.payload().unwrap_or_default().to_vec(),
847 // headers,
848 // topic: msg.topic().to_string(),
849 // partition: msg.partition(),
850 // offset: msg.offset(),
851 // }}))
852 // }}
853 // Some(Err(e)) => Err({pascal_name}Error::BrokerError(e.to_string())),
854 // None => Ok(None),
855 // }}
856
857 // Placeholder: simulate no messages
858 tokio::time::sleep(timeout).await;
859 Ok(None)
860 }}
861
862 async fn commit(&self, message: &Message) -> Result<()> {{
863 debug!(
864 topic = %message.topic,
865 partition = %message.partition,
866 offset = %message.offset,
867 "Committing offset"
868 );
869 // In a real implementation:
870 // self.consumer.commit_message(&message, CommitMode::Async)?;
871 Ok(())
872 }}
873
874 async fn send_to_dlq(&self, dlq_topic: &str, message: &Message) -> Result<()> {{
875 info!(
876 dlq_topic = %dlq_topic,
877 original_topic = %message.topic,
878 "Sending message to DLQ"
879 );
880 // In a real implementation, use a producer to send to DLQ
881 Ok(())
882 }}
883}}
884"#,
885 pascal_name = pascal_name,
886 )
887}
888
889pub fn infrastructure_idempotency(config: &ProjectConfig) -> String {
891 let consumer = config.consumer.as_ref().unwrap();
892 let pascal_name = to_pascal_case(&consumer.service_name);
893
894 format!(
895 r#"//! Idempotency store implementations
896
897use std::collections::HashSet;
898use std::sync::RwLock;
899use async_trait::async_trait;
900
901use crate::error::{{Result, {pascal_name}Error}};
902
903/// Idempotency store trait
904#[async_trait]
905pub trait IdempotencyStore: Send + Sync {{
906 /// Check if an event has already been processed
907 async fn exists(&self, event_id: &str) -> Result<bool>;
908
909 /// Mark an event as processed
910 async fn mark_processed(&self, event_id: &str) -> Result<()>;
911}}
912
913/// In-memory idempotency store (for development/testing)
914pub struct InMemoryIdempotencyStore {{
915 processed: RwLock<HashSet<String>>,
916}}
917
918impl InMemoryIdempotencyStore {{
919 pub fn new() -> Self {{
920 Self {{
921 processed: RwLock::new(HashSet::new()),
922 }}
923 }}
924}}
925
926impl Default for InMemoryIdempotencyStore {{
927 fn default() -> Self {{
928 Self::new()
929 }}
930}}
931
932#[async_trait]
933impl IdempotencyStore for InMemoryIdempotencyStore {{
934 async fn exists(&self, event_id: &str) -> Result<bool> {{
935 let guard = self.processed.read()
936 .map_err(|e| {pascal_name}Error::IdempotencyError(e.to_string()))?;
937 Ok(guard.contains(event_id))
938 }}
939
940 async fn mark_processed(&self, event_id: &str) -> Result<()> {{
941 let mut guard = self.processed.write()
942 .map_err(|e| {pascal_name}Error::IdempotencyError(e.to_string()))?;
943 guard.insert(event_id.to_string());
944 Ok(())
945 }}
946}}
947
948// TODO: Add Redis implementation
949// pub struct RedisIdempotencyStore {{ ... }}
950
951// TODO: Add PostgreSQL implementation
952// pub struct PostgresIdempotencyStore {{ ... }}
953"#,
954 pascal_name = pascal_name,
955 )
956}
957
958pub fn infrastructure_health(_config: &ProjectConfig) -> String {
960 r#"//! Health check server
961
962use std::net::SocketAddr;
963use axum::{routing::get, Router, Json};
964use serde::Serialize;
965use tracing::info;
966
967/// Health response
968#[derive(Serialize)]
969pub struct HealthResponse {
970 pub status: String,
971 pub service: String,
972}
973
974/// Health check server
975pub struct HealthServer {
976 port: u16,
977}
978
979impl HealthServer {
980 pub fn new(port: u16) -> Self {
981 Self { port }
982 }
983
984 pub async fn run(&self) -> anyhow::Result<()> {
985 let app = Router::new()
986 .route("/health", get(health_handler))
987 .route("/ready", get(ready_handler));
988
989 let addr: SocketAddr = ([0, 0, 0, 0], self.port).into();
990 info!("Health server listening on {}", addr);
991
992 let listener = tokio::net::TcpListener::bind(addr).await?;
993 axum::serve(listener, app).await?;
994
995 Ok(())
996 }
997}
998
999async fn health_handler() -> Json<HealthResponse> {
1000 Json(HealthResponse {
1001 status: "healthy".to_string(),
1002 service: env!("CARGO_PKG_NAME").to_string(),
1003 })
1004}
1005
1006async fn ready_handler() -> Json<HealthResponse> {
1007 // TODO: Check actual readiness (broker connection, etc.)
1008 Json(HealthResponse {
1009 status: "ready".to_string(),
1010 service: env!("CARGO_PKG_NAME").to_string(),
1011 })
1012}
1013"#
1014 .to_string()
1015}
1016
1017pub fn readme(config: &ProjectConfig) -> String {
1019 let consumer = config.consumer.as_ref().unwrap();
1020
1021 format!(
1022 r#"# {display_name}
1023
1024An event consumer service built with AllFrame.
1025
1026## Features
1027
1028- **{broker} Consumer**: Event-driven message processing
1029- **Idempotency**: Duplicate event detection
1030- **Dead Letter Queue**: Failed message handling
1031- **Retry with Backoff**: Configurable retry strategy
1032- **Health Checks**: Kubernetes-ready endpoints
1033
1034## Configuration
1035
1036Set the following environment variables:
1037
1038```bash
1039# Broker configuration
1040{upper_name}_BROKERS=localhost:9092
1041{upper_name}_GROUP_ID={service_name}-group
1042{upper_name}_TOPICS=events
1043
1044# Retry configuration
1045{upper_name}_RETRY_MAX_ATTEMPTS=3
1046{upper_name}_RETRY_INITIAL_BACKOFF_MS=100
1047{upper_name}_RETRY_MAX_BACKOFF_MS=10000
1048
1049# DLQ configuration
1050{upper_name}_DLQ_ENABLED=true
1051{upper_name}_DLQ_SUFFIX=.dlq
1052
1053# Server configuration
1054{upper_name}_HEALTH_PORT=8081
1055{upper_name}_METRICS_PORT=9090
1056```
1057
1058## Running
1059
1060```bash
1061cargo run
1062```
1063
1064## Health Endpoints
1065
1066- `GET /health` - Liveness check
1067- `GET /ready` - Readiness check
1068
1069## Adding Event Handlers
1070
10711. Define your event type in `src/domain/events.rs`
10722. Create a handler in `src/domain/handlers.rs`
10733. Register it in the `HandlerRegistry`
1074
1075## Generated by AllFrame Ignite
1076"#,
1077 display_name = consumer.display_name,
1078 broker = consumer.broker,
1079 service_name = consumer.service_name,
1080 upper_name = consumer.service_name.to_uppercase().replace('-', "_"),
1081 )
1082}
1083
1084pub fn dockerfile(config: &ProjectConfig) -> String {
1086 let name = &config.name;
1087
1088 format!(
1089 r#"# Build stage
1090FROM rust:1.86-slim as builder
1091
1092WORKDIR /app
1093COPY . .
1094RUN cargo build --release
1095
1096# Runtime stage
1097FROM debian:bookworm-slim
1098
1099RUN apt-get update && apt-get install -y \
1100 ca-certificates \
1101 libssl3 \
1102 && rm -rf /var/lib/apt/lists/*
1103
1104WORKDIR /app
1105COPY --from=builder /app/target/release/{name} /app/{name}
1106
1107ENV RUST_LOG=info
1108
1109EXPOSE 8081
1110
1111CMD ["/app/{name}"]
1112"#,
1113 name = name,
1114 )
1115}
1116
1117#[cfg(test)]
1118mod tests {
1119 use super::*;
1120
1121 #[test]
1122 fn test_to_pascal_case() {
1123 assert_eq!(to_pascal_case("order-processor"), "OrderProcessor");
1124 assert_eq!(to_pascal_case("user_handler"), "UserHandler");
1125 assert_eq!(to_pascal_case("simple"), "Simple");
1126 }
1127}