Skip to main content

allframe_forge/templates/
consumer.rs

1//! Consumer archetype templates
2//!
3//! Templates for generating event consumer services with Kafka,
4//! idempotency, dead letter queues, and resilience patterns.
5
6use crate::config::{MessageBroker, ProjectConfig};
7
8/// Convert a string to PascalCase
9fn to_pascal_case(s: &str) -> String {
10    s.split(|c| c == '-' || c == '_')
11        .map(|word| {
12            let mut chars = word.chars();
13            match chars.next() {
14                None => String::new(),
15                Some(first) => first.to_uppercase().chain(chars).collect(),
16            }
17        })
18        .collect()
19}
20
21/// Generate Cargo.toml for consumer project
22pub fn cargo_toml(config: &ProjectConfig) -> String {
23    let consumer = config.consumer.as_ref().unwrap();
24    let name = &config.name;
25
26    let broker_deps = match consumer.broker {
27        MessageBroker::Kafka => r#"rdkafka = { version = "0.36", features = ["cmake-build"] }"#,
28        MessageBroker::RabbitMq => r#"lapin = "2.3""#,
29        MessageBroker::Redis => {
30            r#"redis = { version = "0.25", features = ["tokio-comp", "streams"] }"#
31        }
32        MessageBroker::Sqs => r#"aws-sdk-sqs = "1.0""#,
33        MessageBroker::PubSub => r#"google-cloud-pubsub = "0.25""#,
34    };
35
36    format!(
37        r#"[package]
38name = "{name}"
39version = "0.1.0"
40edition = "2021"
41rust-version = "1.75"
42description = "{display_name}"
43
44[dependencies]
45# AllFrame
46allframe-core = {{ version = "0.1", features = ["resilience", "otel"] }}
47
48# Message Broker
49{broker_deps}
50
51# Async
52tokio = {{ version = "1", features = ["full"] }}
53async-trait = "0.1"
54futures = "0.3"
55
56# Serialization
57serde = {{ version = "1.0", features = ["derive"] }}
58serde_json = "1.0"
59
60# Error handling
61thiserror = "2.0"
62anyhow = "1.0"
63
64# Tracing & Metrics
65tracing = "0.1"
66tracing-subscriber = {{ version = "0.3", features = ["env-filter"] }}
67opentelemetry = {{ version = "0.27", features = ["metrics"] }}
68
69# Utilities
70chrono = {{ version = "0.4", features = ["serde"] }}
71uuid = {{ version = "1.0", features = ["v4", "serde"] }}
72dotenvy = "0.15"
73
74# Health checks
75axum = "0.7"
76
77[dev-dependencies]
78tokio-test = "0.4"
79mockall = "0.13"
80
81[[bin]]
82name = "{name}"
83path = "src/main.rs"
84"#,
85        name = name,
86        display_name = consumer.display_name,
87        broker_deps = broker_deps,
88    )
89}
90
91/// Generate main.rs
92pub fn main_rs(config: &ProjectConfig) -> String {
93    let consumer = config.consumer.as_ref().unwrap();
94    let pascal_name = to_pascal_case(&consumer.service_name);
95
96    format!(
97        r#"//! {display_name}
98//!
99//! An event consumer service with idempotency, retry, and dead letter queue handling.
100
101use std::sync::Arc;
102use tracing::info;
103
104mod config;
105mod error;
106mod domain;
107mod application;
108mod infrastructure;
109
110use config::Config;
111use application::{pascal_name}Consumer;
112use infrastructure::{{
113    KafkaMessageBroker,
114    InMemoryIdempotencyStore,
115    HealthServer,
116}};
117
118#[tokio::main]
119async fn main() -> anyhow::Result<()> {{
120    // Load environment variables
121    dotenvy::dotenv().ok();
122
123    // Initialize tracing
124    tracing_subscriber::fmt()
125        .with_env_filter(
126            tracing_subscriber::EnvFilter::from_default_env()
127                .add_directive(tracing::Level::INFO.into()),
128        )
129        .init();
130
131    // Load configuration
132    let config = Config::from_env();
133    info!("Starting {display_name}");
134    info!("Broker: {{}}", config.broker.brokers);
135    info!("Topics: {{:?}}", config.topics);
136
137    // Create idempotency store
138    let idempotency_store = Arc::new(InMemoryIdempotencyStore::new());
139
140    // Create message broker
141    let broker = KafkaMessageBroker::new(&config.broker).await?;
142
143    // Create consumer
144    let consumer = {pascal_name}Consumer::new(
145        broker,
146        idempotency_store,
147        config.retry.clone(),
148        config.dlq.clone(),
149    );
150
151    // Start health server in background
152    let health_handle = tokio::spawn(async move {{
153        let health_server = HealthServer::new(config.server.health_port);
154        health_server.run().await
155    }});
156
157    // Run consumer
158    info!("Consumer started, waiting for messages...");
159    consumer.run(&config.topics).await?;
160
161    health_handle.abort();
162    info!("Consumer shutdown complete");
163    Ok(())
164}}
165"#,
166        display_name = consumer.display_name,
167        pascal_name = pascal_name,
168    )
169}
170
171/// Generate config.rs
172pub fn config_rs(config: &ProjectConfig) -> String {
173    let consumer = config.consumer.as_ref().unwrap();
174    let service_name = &consumer.service_name;
175    let upper_name = service_name.to_uppercase().replace('-', "_");
176
177    format!(
178        r#"//! Configuration module
179//!
180//! Loads configuration from environment variables.
181
182use std::time::Duration;
183
184/// Main configuration
185#[derive(Debug, Clone)]
186pub struct Config {{
187    pub broker: BrokerConfig,
188    pub topics: Vec<String>,
189    pub group_id: String,
190    pub retry: RetryConfig,
191    pub dlq: DlqConfig,
192    pub server: ServerConfig,
193}}
194
195/// Message broker configuration
196#[derive(Debug, Clone)]
197pub struct BrokerConfig {{
198    pub brokers: String,
199    pub security_protocol: String,
200    pub sasl_mechanism: Option<String>,
201    pub sasl_username: Option<String>,
202    pub sasl_password: Option<String>,
203}}
204
205/// Retry configuration
206#[derive(Debug, Clone)]
207pub struct RetryConfig {{
208    pub max_attempts: u32,
209    pub initial_backoff: Duration,
210    pub max_backoff: Duration,
211    pub multiplier: f64,
212}}
213
214/// Dead Letter Queue configuration
215#[derive(Debug, Clone)]
216pub struct DlqConfig {{
217    pub enabled: bool,
218    pub suffix: String,
219}}
220
221/// Server configuration
222#[derive(Debug, Clone)]
223pub struct ServerConfig {{
224    pub health_port: u16,
225    pub metrics_port: u16,
226}}
227
228impl Config {{
229    pub fn from_env() -> Self {{
230        Self {{
231            broker: BrokerConfig {{
232                brokers: std::env::var("{upper_name}_BROKERS")
233                    .unwrap_or_else(|_| "localhost:9092".to_string()),
234                security_protocol: std::env::var("{upper_name}_SECURITY_PROTOCOL")
235                    .unwrap_or_else(|_| "PLAINTEXT".to_string()),
236                sasl_mechanism: std::env::var("{upper_name}_SASL_MECHANISM").ok(),
237                sasl_username: std::env::var("{upper_name}_SASL_USERNAME").ok(),
238                sasl_password: std::env::var("{upper_name}_SASL_PASSWORD").ok(),
239            }},
240            topics: std::env::var("{upper_name}_TOPICS")
241                .unwrap_or_else(|_| "events".to_string())
242                .split(',')
243                .map(|s| s.trim().to_string())
244                .collect(),
245            group_id: std::env::var("{upper_name}_GROUP_ID")
246                .unwrap_or_else(|_| "{service_name}-group".to_string()),
247            retry: RetryConfig {{
248                max_attempts: std::env::var("{upper_name}_RETRY_MAX_ATTEMPTS")
249                    .ok()
250                    .and_then(|v| v.parse().ok())
251                    .unwrap_or({max_attempts}),
252                initial_backoff: Duration::from_millis(
253                    std::env::var("{upper_name}_RETRY_INITIAL_BACKOFF_MS")
254                        .ok()
255                        .and_then(|v| v.parse().ok())
256                        .unwrap_or({initial_backoff_ms}),
257                ),
258                max_backoff: Duration::from_millis(
259                    std::env::var("{upper_name}_RETRY_MAX_BACKOFF_MS")
260                        .ok()
261                        .and_then(|v| v.parse().ok())
262                        .unwrap_or({max_backoff_ms}),
263                ),
264                multiplier: std::env::var("{upper_name}_RETRY_MULTIPLIER")
265                    .ok()
266                    .and_then(|v| v.parse().ok())
267                    .unwrap_or({multiplier}),
268            }},
269            dlq: DlqConfig {{
270                enabled: std::env::var("{upper_name}_DLQ_ENABLED")
271                    .ok()
272                    .and_then(|v| v.parse().ok())
273                    .unwrap_or(true),
274                suffix: std::env::var("{upper_name}_DLQ_SUFFIX")
275                    .unwrap_or_else(|_| ".dlq".to_string()),
276            }},
277            server: ServerConfig {{
278                health_port: std::env::var("{upper_name}_HEALTH_PORT")
279                    .ok()
280                    .and_then(|v| v.parse().ok())
281                    .unwrap_or({health_port}),
282                metrics_port: std::env::var("{upper_name}_METRICS_PORT")
283                    .ok()
284                    .and_then(|v| v.parse().ok())
285                    .unwrap_or({metrics_port}),
286            }},
287        }}
288    }}
289}}
290"#,
291        upper_name = upper_name,
292        service_name = service_name,
293        max_attempts = consumer.retry.max_attempts,
294        initial_backoff_ms = consumer.retry.initial_backoff_ms,
295        max_backoff_ms = consumer.retry.max_backoff_ms,
296        multiplier = consumer.retry.multiplier,
297        health_port = consumer.server.health_port,
298        metrics_port = consumer.server.metrics_port,
299    )
300}
301
302/// Generate error.rs
303pub fn error_rs(config: &ProjectConfig) -> String {
304    let consumer = config.consumer.as_ref().unwrap();
305    let pascal_name = to_pascal_case(&consumer.service_name);
306
307    format!(
308        r#"//! Error types for the consumer service
309
310use thiserror::Error;
311
312/// Consumer error type
313#[derive(Error, Debug)]
314pub enum {pascal_name}Error {{
315    #[error("Broker error: {{0}}")]
316    BrokerError(String),
317
318    #[error("Deserialization error: {{0}}")]
319    DeserializationError(String),
320
321    #[error("Handler error: {{0}}")]
322    HandlerError(String),
323
324    #[error("Idempotency error: {{0}}")]
325    IdempotencyError(String),
326
327    #[error("DLQ error: {{0}}")]
328    DlqError(String),
329
330    #[error("Configuration error: {{0}}")]
331    ConfigError(String),
332}}
333
334/// Result type alias
335pub type Result<T> = std::result::Result<T, {pascal_name}Error>;
336
337impl From<{pascal_name}Error> for tonic::Status {{
338    fn from(e: {pascal_name}Error) -> Self {{
339        tonic::Status::internal(e.to_string())
340    }}
341}}
342"#,
343        pascal_name = pascal_name,
344    )
345}
346
347/// Generate domain/mod.rs
348pub fn domain_mod(_config: &ProjectConfig) -> String {
349    r#"//! Domain layer - Events and handlers
350
351pub mod events;
352pub mod handlers;
353
354pub use events::*;
355pub use handlers::*;
356"#
357    .to_string()
358}
359
360/// Generate domain/events.rs
361pub fn domain_events(_config: &ProjectConfig) -> String {
362    r#"//! Domain events
363
364use serde::{Deserialize, Serialize};
365use chrono::{DateTime, Utc};
366use uuid::Uuid;
367
368/// Base event envelope
369#[derive(Debug, Clone, Serialize, Deserialize)]
370pub struct EventEnvelope<T> {
371    /// Unique event ID (used for idempotency)
372    pub id: Uuid,
373    /// Event type name
374    pub event_type: String,
375    /// Event timestamp
376    pub timestamp: DateTime<Utc>,
377    /// Correlation ID for tracing
378    pub correlation_id: Option<Uuid>,
379    /// Event payload
380    pub payload: T,
381    /// Event metadata
382    pub metadata: EventMetadata,
383}
384
385/// Event metadata
386#[derive(Debug, Clone, Default, Serialize, Deserialize)]
387pub struct EventMetadata {
388    /// Source service
389    pub source: Option<String>,
390    /// Event version
391    pub version: Option<String>,
392    /// Retry count
393    pub retry_count: u32,
394}
395
396impl<T> EventEnvelope<T> {
397    /// Create a new event envelope
398    pub fn new(event_type: impl Into<String>, payload: T) -> Self {
399        Self {
400            id: Uuid::new_v4(),
401            event_type: event_type.into(),
402            timestamp: Utc::now(),
403            correlation_id: None,
404            payload,
405            metadata: EventMetadata::default(),
406        }
407    }
408
409    /// Set correlation ID
410    pub fn with_correlation_id(mut self, id: Uuid) -> Self {
411        self.correlation_id = Some(id);
412        self
413    }
414}
415
416/// Example domain event: User Created
417#[derive(Debug, Clone, Serialize, Deserialize)]
418pub struct UserCreated {
419    pub user_id: Uuid,
420    pub email: String,
421    pub name: String,
422}
423
424/// Example domain event: Order Placed
425#[derive(Debug, Clone, Serialize, Deserialize)]
426pub struct OrderPlaced {
427    pub order_id: Uuid,
428    pub user_id: Uuid,
429    pub total: f64,
430    pub items: Vec<OrderItem>,
431}
432
433/// Order item
434#[derive(Debug, Clone, Serialize, Deserialize)]
435pub struct OrderItem {
436    pub product_id: Uuid,
437    pub quantity: u32,
438    pub price: f64,
439}
440"#
441    .to_string()
442}
443
444/// Generate domain/handlers.rs
445pub fn domain_handlers(config: &ProjectConfig) -> String {
446    let consumer = config.consumer.as_ref().unwrap();
447    let pascal_name = to_pascal_case(&consumer.service_name);
448
449    format!(
450        r#"//! Event handlers
451
452use async_trait::async_trait;
453use tracing::{{info, warn}};
454
455use crate::domain::events::*;
456use crate::error::Result;
457
458/// Event handler trait
459#[async_trait]
460pub trait EventHandler<E>: Send + Sync {{
461    /// Handle an event
462    async fn handle(&self, event: EventEnvelope<E>) -> Result<()>;
463}}
464
465/// User created event handler
466pub struct UserCreatedHandler;
467
468#[async_trait]
469impl EventHandler<UserCreated> for UserCreatedHandler {{
470    async fn handle(&self, event: EventEnvelope<UserCreated>) -> Result<()> {{
471        info!(
472            user_id = %event.payload.user_id,
473            email = %event.payload.email,
474            "Processing UserCreated event"
475        );
476
477        // TODO: Implement your business logic here
478        // Examples:
479        // - Send welcome email
480        // - Create user profile
481        // - Initialize user preferences
482
483        Ok(())
484    }}
485}}
486
487/// Order placed event handler
488pub struct OrderPlacedHandler;
489
490#[async_trait]
491impl EventHandler<OrderPlaced> for OrderPlacedHandler {{
492    async fn handle(&self, event: EventEnvelope<OrderPlaced>) -> Result<()> {{
493        info!(
494            order_id = %event.payload.order_id,
495            user_id = %event.payload.user_id,
496            total = %event.payload.total,
497            "Processing OrderPlaced event"
498        );
499
500        // TODO: Implement your business logic here
501        // Examples:
502        // - Reserve inventory
503        // - Process payment
504        // - Send confirmation email
505
506        Ok(())
507    }}
508}}
509
510/// Handler registry for routing events to handlers
511pub struct {pascal_name}HandlerRegistry {{
512    user_created_handler: UserCreatedHandler,
513    order_placed_handler: OrderPlacedHandler,
514}}
515
516impl Default for {pascal_name}HandlerRegistry {{
517    fn default() -> Self {{
518        Self::new()
519    }}
520}}
521
522impl {pascal_name}HandlerRegistry {{
523    pub fn new() -> Self {{
524        Self {{
525            user_created_handler: UserCreatedHandler,
526            order_placed_handler: OrderPlacedHandler,
527        }}
528    }}
529
530    /// Route and handle an event based on its type
531    pub async fn handle_event(&self, event_type: &str, payload: &[u8]) -> Result<()> {{
532        match event_type {{
533            "UserCreated" => {{
534                let event: EventEnvelope<UserCreated> = serde_json::from_slice(payload)
535                    .map_err(|e| crate::error::{pascal_name}Error::DeserializationError(e.to_string()))?;
536                self.user_created_handler.handle(event).await
537            }}
538            "OrderPlaced" => {{
539                let event: EventEnvelope<OrderPlaced> = serde_json::from_slice(payload)
540                    .map_err(|e| crate::error::{pascal_name}Error::DeserializationError(e.to_string()))?;
541                self.order_placed_handler.handle(event).await
542            }}
543            _ => {{
544                warn!(event_type = %event_type, "Unknown event type, skipping");
545                Ok(())
546            }}
547        }}
548    }}
549}}
550"#,
551        pascal_name = pascal_name,
552    )
553}
554
555/// Generate application/mod.rs
556pub fn application_mod(_config: &ProjectConfig) -> String {
557    r#"//! Application layer - Consumer orchestration
558
559pub mod consumer;
560
561pub use consumer::*;
562"#
563    .to_string()
564}
565
566/// Generate application/consumer.rs
567pub fn application_consumer(config: &ProjectConfig) -> String {
568    let consumer = config.consumer.as_ref().unwrap();
569    let pascal_name = to_pascal_case(&consumer.service_name);
570
571    format!(
572        r#"//! Main consumer implementation
573
574use std::sync::Arc;
575use std::time::Duration;
576use async_trait::async_trait;
577use tracing::{{info, warn, error, instrument}};
578
579use crate::config::{{RetryConfig, DlqConfig}};
580use crate::domain::{pascal_name}HandlerRegistry;
581use crate::error::{{Result, {pascal_name}Error}};
582use crate::infrastructure::{{MessageBroker, IdempotencyStore}};
583
584/// Message from the broker
585#[derive(Debug, Clone)]
586pub struct Message {{
587    /// Message key
588    pub key: Option<String>,
589    /// Message payload
590    pub payload: Vec<u8>,
591    /// Message headers
592    pub headers: Vec<(String, String)>,
593    /// Topic
594    pub topic: String,
595    /// Partition
596    pub partition: i32,
597    /// Offset
598    pub offset: i64,
599}}
600
601impl Message {{
602    /// Get the event ID from headers
603    pub fn event_id(&self) -> Option<String> {{
604        self.headers
605            .iter()
606            .find(|(k, _)| k == "event_id")
607            .map(|(_, v)| v.clone())
608    }}
609
610    /// Get the event type from headers
611    pub fn event_type(&self) -> Option<String> {{
612        self.headers
613            .iter()
614            .find(|(k, _)| k == "event_type")
615            .map(|(_, v)| v.clone())
616    }}
617}}
618
619/// Main consumer service
620pub struct {pascal_name}Consumer<B: MessageBroker, I: IdempotencyStore> {{
621    broker: B,
622    idempotency_store: Arc<I>,
623    handler_registry: {pascal_name}HandlerRegistry,
624    retry_config: RetryConfig,
625    dlq_config: DlqConfig,
626}}
627
628impl<B: MessageBroker, I: IdempotencyStore> {pascal_name}Consumer<B, I> {{
629    pub fn new(
630        broker: B,
631        idempotency_store: Arc<I>,
632        retry_config: RetryConfig,
633        dlq_config: DlqConfig,
634    ) -> Self {{
635        Self {{
636            broker,
637            idempotency_store,
638            handler_registry: {pascal_name}HandlerRegistry::new(),
639            retry_config,
640            dlq_config,
641        }}
642    }}
643
644    /// Run the consumer loop
645    pub async fn run(&self, topics: &[String]) -> Result<()> {{
646        info!("Subscribing to topics: {{:?}}", topics);
647        self.broker.subscribe(topics).await?;
648
649        loop {{
650            match self.broker.poll(Duration::from_secs(1)).await {{
651                Ok(Some(message)) => {{
652                    if let Err(e) = self.process_message(message).await {{
653                        error!("Error processing message: {{}}", e);
654                    }}
655                }}
656                Ok(None) => {{
657                    // No message available, continue polling
658                }}
659                Err(e) => {{
660                    error!("Error polling for messages: {{}}", e);
661                    tokio::time::sleep(Duration::from_secs(1)).await;
662                }}
663            }}
664        }}
665    }}
666
667    #[instrument(skip(self, message), fields(topic = %message.topic, partition = %message.partition, offset = %message.offset))]
668    async fn process_message(&self, message: Message) -> Result<()> {{
669        let event_id = message.event_id().unwrap_or_else(|| {{
670            format!("{{}}:{{}}:{{}}", message.topic, message.partition, message.offset)
671        }});
672
673        // Check idempotency
674        if self.idempotency_store.exists(&event_id).await? {{
675            info!(event_id = %event_id, "Event already processed, skipping");
676            self.broker.commit(&message).await?;
677            return Ok(());
678        }}
679
680        let event_type = message.event_type().unwrap_or_else(|| "Unknown".to_string());
681
682        // Process with retry
683        let result = self.process_with_retry(&event_type, &message.payload).await;
684
685        match result {{
686            Ok(()) => {{
687                // Mark as processed
688                self.idempotency_store.mark_processed(&event_id).await?;
689                self.broker.commit(&message).await?;
690                info!(event_id = %event_id, "Event processed successfully");
691            }}
692            Err(e) => {{
693                error!(event_id = %event_id, error = %e, "Failed to process event");
694
695                if self.dlq_config.enabled {{
696                    // Send to DLQ
697                    let dlq_topic = format!("{{}}{{}}", message.topic, self.dlq_config.suffix);
698                    self.broker.send_to_dlq(&dlq_topic, &message).await?;
699                    self.broker.commit(&message).await?;
700                    warn!(event_id = %event_id, dlq_topic = %dlq_topic, "Event sent to DLQ");
701                }} else {{
702                    return Err(e);
703                }}
704            }}
705        }}
706
707        Ok(())
708    }}
709
710    async fn process_with_retry(&self, event_type: &str, payload: &[u8]) -> Result<()> {{
711        let mut attempts = 0;
712        let mut backoff = self.retry_config.initial_backoff;
713
714        loop {{
715            attempts += 1;
716
717            match self.handler_registry.handle_event(event_type, payload).await {{
718                Ok(()) => return Ok(()),
719                Err(e) if attempts >= self.retry_config.max_attempts => {{
720                    return Err(e);
721                }}
722                Err(e) => {{
723                    warn!(
724                        attempt = attempts,
725                        max_attempts = self.retry_config.max_attempts,
726                        error = %e,
727                        "Handler failed, retrying"
728                    );
729
730                    tokio::time::sleep(backoff).await;
731                    backoff = std::cmp::min(
732                        Duration::from_secs_f64(backoff.as_secs_f64() * self.retry_config.multiplier),
733                        self.retry_config.max_backoff,
734                    );
735                }}
736            }}
737        }}
738    }}
739}}
740"#,
741        pascal_name = pascal_name,
742    )
743}
744
745/// Generate infrastructure/mod.rs
746pub fn infrastructure_mod(_config: &ProjectConfig) -> String {
747    r#"//! Infrastructure layer - Message brokers and stores
748
749mod broker;
750mod idempotency;
751mod health;
752
753pub use broker::*;
754pub use idempotency::*;
755pub use health::*;
756"#
757    .to_string()
758}
759
760/// Generate infrastructure/broker.rs
761pub fn infrastructure_broker(config: &ProjectConfig) -> String {
762    let consumer = config.consumer.as_ref().unwrap();
763    let pascal_name = to_pascal_case(&consumer.service_name);
764
765    format!(
766        r#"//! Message broker implementations
767
768use std::time::Duration;
769use async_trait::async_trait;
770use tracing::{{info, debug}};
771
772use crate::config::BrokerConfig;
773use crate::application::Message;
774use crate::error::{{Result, {pascal_name}Error}};
775
776/// Message broker trait
777#[async_trait]
778pub trait MessageBroker: Send + Sync {{
779    /// Subscribe to topics
780    async fn subscribe(&self, topics: &[String]) -> Result<()>;
781
782    /// Poll for messages
783    async fn poll(&self, timeout: Duration) -> Result<Option<Message>>;
784
785    /// Commit message offset
786    async fn commit(&self, message: &Message) -> Result<()>;
787
788    /// Send message to DLQ
789    async fn send_to_dlq(&self, dlq_topic: &str, message: &Message) -> Result<()>;
790}}
791
792/// Kafka message broker implementation
793pub struct KafkaMessageBroker {{
794    // In a real implementation, this would hold the rdkafka consumer
795    _config: BrokerConfig,
796}}
797
798impl KafkaMessageBroker {{
799    pub async fn new(config: &BrokerConfig) -> Result<Self> {{
800        info!("Connecting to Kafka brokers: {{}}", config.brokers);
801
802        // In a real implementation:
803        // let consumer: StreamConsumer = ClientConfig::new()
804        //     .set("bootstrap.servers", &config.brokers)
805        //     .set("group.id", &group_id)
806        //     .set("enable.partition.eof", "false")
807        //     .set("session.timeout.ms", "6000")
808        //     .set("enable.auto.commit", "false")
809        //     .create()?;
810
811        Ok(Self {{
812            _config: config.clone(),
813        }})
814    }}
815}}
816
817#[async_trait]
818impl MessageBroker for KafkaMessageBroker {{
819    async fn subscribe(&self, topics: &[String]) -> Result<()> {{
820        info!("Subscribing to topics: {{:?}}", topics);
821        // In a real implementation:
822        // self.consumer.subscribe(&topics.iter().map(|s| s.as_str()).collect::<Vec<_>>())?;
823        Ok(())
824    }}
825
826    async fn poll(&self, timeout: Duration) -> Result<Option<Message>> {{
827        debug!("Polling for messages with timeout: {{:?}}", timeout);
828
829        // In a real implementation:
830        // match self.consumer.poll(timeout) {{
831        //     Some(Ok(msg)) => {{
832        //         let headers = msg.headers()
833        //             .map(|h| {{
834        //                 (0..h.count())
835        //                     .filter_map(|i| {{
836        //                         h.get(i).map(|header| {{
837        //                             (header.key.to_string(), String::from_utf8_lossy(header.value.unwrap_or_default()).to_string())
838        //                         }})
839        //                     }})
840        //                     .collect()
841        //             }})
842        //             .unwrap_or_default();
843        //
844        //         Ok(Some(Message {{
845        //             key: msg.key().map(|k| String::from_utf8_lossy(k).to_string()),
846        //             payload: msg.payload().unwrap_or_default().to_vec(),
847        //             headers,
848        //             topic: msg.topic().to_string(),
849        //             partition: msg.partition(),
850        //             offset: msg.offset(),
851        //         }}))
852        //     }}
853        //     Some(Err(e)) => Err({pascal_name}Error::BrokerError(e.to_string())),
854        //     None => Ok(None),
855        // }}
856
857        // Placeholder: simulate no messages
858        tokio::time::sleep(timeout).await;
859        Ok(None)
860    }}
861
862    async fn commit(&self, message: &Message) -> Result<()> {{
863        debug!(
864            topic = %message.topic,
865            partition = %message.partition,
866            offset = %message.offset,
867            "Committing offset"
868        );
869        // In a real implementation:
870        // self.consumer.commit_message(&message, CommitMode::Async)?;
871        Ok(())
872    }}
873
874    async fn send_to_dlq(&self, dlq_topic: &str, message: &Message) -> Result<()> {{
875        info!(
876            dlq_topic = %dlq_topic,
877            original_topic = %message.topic,
878            "Sending message to DLQ"
879        );
880        // In a real implementation, use a producer to send to DLQ
881        Ok(())
882    }}
883}}
884"#,
885        pascal_name = pascal_name,
886    )
887}
888
889/// Generate infrastructure/idempotency.rs
890pub fn infrastructure_idempotency(config: &ProjectConfig) -> String {
891    let consumer = config.consumer.as_ref().unwrap();
892    let pascal_name = to_pascal_case(&consumer.service_name);
893
894    format!(
895        r#"//! Idempotency store implementations
896
897use std::collections::HashSet;
898use std::sync::RwLock;
899use async_trait::async_trait;
900
901use crate::error::{{Result, {pascal_name}Error}};
902
903/// Idempotency store trait
904#[async_trait]
905pub trait IdempotencyStore: Send + Sync {{
906    /// Check if an event has already been processed
907    async fn exists(&self, event_id: &str) -> Result<bool>;
908
909    /// Mark an event as processed
910    async fn mark_processed(&self, event_id: &str) -> Result<()>;
911}}
912
913/// In-memory idempotency store (for development/testing)
914pub struct InMemoryIdempotencyStore {{
915    processed: RwLock<HashSet<String>>,
916}}
917
918impl InMemoryIdempotencyStore {{
919    pub fn new() -> Self {{
920        Self {{
921            processed: RwLock::new(HashSet::new()),
922        }}
923    }}
924}}
925
926impl Default for InMemoryIdempotencyStore {{
927    fn default() -> Self {{
928        Self::new()
929    }}
930}}
931
932#[async_trait]
933impl IdempotencyStore for InMemoryIdempotencyStore {{
934    async fn exists(&self, event_id: &str) -> Result<bool> {{
935        let guard = self.processed.read()
936            .map_err(|e| {pascal_name}Error::IdempotencyError(e.to_string()))?;
937        Ok(guard.contains(event_id))
938    }}
939
940    async fn mark_processed(&self, event_id: &str) -> Result<()> {{
941        let mut guard = self.processed.write()
942            .map_err(|e| {pascal_name}Error::IdempotencyError(e.to_string()))?;
943        guard.insert(event_id.to_string());
944        Ok(())
945    }}
946}}
947
948// TODO: Add Redis implementation
949// pub struct RedisIdempotencyStore {{ ... }}
950
951// TODO: Add PostgreSQL implementation
952// pub struct PostgresIdempotencyStore {{ ... }}
953"#,
954        pascal_name = pascal_name,
955    )
956}
957
958/// Generate infrastructure/health.rs
959pub fn infrastructure_health(_config: &ProjectConfig) -> String {
960    r#"//! Health check server
961
962use std::net::SocketAddr;
963use axum::{routing::get, Router, Json};
964use serde::Serialize;
965use tracing::info;
966
967/// Health response
968#[derive(Serialize)]
969pub struct HealthResponse {
970    pub status: String,
971    pub service: String,
972}
973
974/// Health check server
975pub struct HealthServer {
976    port: u16,
977}
978
979impl HealthServer {
980    pub fn new(port: u16) -> Self {
981        Self { port }
982    }
983
984    pub async fn run(&self) -> anyhow::Result<()> {
985        let app = Router::new()
986            .route("/health", get(health_handler))
987            .route("/ready", get(ready_handler));
988
989        let addr: SocketAddr = ([0, 0, 0, 0], self.port).into();
990        info!("Health server listening on {}", addr);
991
992        let listener = tokio::net::TcpListener::bind(addr).await?;
993        axum::serve(listener, app).await?;
994
995        Ok(())
996    }
997}
998
999async fn health_handler() -> Json<HealthResponse> {
1000    Json(HealthResponse {
1001        status: "healthy".to_string(),
1002        service: env!("CARGO_PKG_NAME").to_string(),
1003    })
1004}
1005
1006async fn ready_handler() -> Json<HealthResponse> {
1007    // TODO: Check actual readiness (broker connection, etc.)
1008    Json(HealthResponse {
1009        status: "ready".to_string(),
1010        service: env!("CARGO_PKG_NAME").to_string(),
1011    })
1012}
1013"#
1014    .to_string()
1015}
1016
1017/// Generate README.md
1018pub fn readme(config: &ProjectConfig) -> String {
1019    let consumer = config.consumer.as_ref().unwrap();
1020
1021    format!(
1022        r#"# {display_name}
1023
1024An event consumer service built with AllFrame.
1025
1026## Features
1027
1028- **{broker} Consumer**: Event-driven message processing
1029- **Idempotency**: Duplicate event detection
1030- **Dead Letter Queue**: Failed message handling
1031- **Retry with Backoff**: Configurable retry strategy
1032- **Health Checks**: Kubernetes-ready endpoints
1033
1034## Configuration
1035
1036Set the following environment variables:
1037
1038```bash
1039# Broker configuration
1040{upper_name}_BROKERS=localhost:9092
1041{upper_name}_GROUP_ID={service_name}-group
1042{upper_name}_TOPICS=events
1043
1044# Retry configuration
1045{upper_name}_RETRY_MAX_ATTEMPTS=3
1046{upper_name}_RETRY_INITIAL_BACKOFF_MS=100
1047{upper_name}_RETRY_MAX_BACKOFF_MS=10000
1048
1049# DLQ configuration
1050{upper_name}_DLQ_ENABLED=true
1051{upper_name}_DLQ_SUFFIX=.dlq
1052
1053# Server configuration
1054{upper_name}_HEALTH_PORT=8081
1055{upper_name}_METRICS_PORT=9090
1056```
1057
1058## Running
1059
1060```bash
1061cargo run
1062```
1063
1064## Health Endpoints
1065
1066- `GET /health` - Liveness check
1067- `GET /ready` - Readiness check
1068
1069## Adding Event Handlers
1070
10711. Define your event type in `src/domain/events.rs`
10722. Create a handler in `src/domain/handlers.rs`
10733. Register it in the `HandlerRegistry`
1074
1075## Generated by AllFrame Ignite
1076"#,
1077        display_name = consumer.display_name,
1078        broker = consumer.broker,
1079        service_name = consumer.service_name,
1080        upper_name = consumer.service_name.to_uppercase().replace('-', "_"),
1081    )
1082}
1083
1084/// Generate Dockerfile
1085pub fn dockerfile(config: &ProjectConfig) -> String {
1086    let name = &config.name;
1087
1088    format!(
1089        r#"# Build stage
1090FROM rust:1.86-slim as builder
1091
1092WORKDIR /app
1093COPY . .
1094RUN cargo build --release
1095
1096# Runtime stage
1097FROM debian:bookworm-slim
1098
1099RUN apt-get update && apt-get install -y \
1100    ca-certificates \
1101    libssl3 \
1102    && rm -rf /var/lib/apt/lists/*
1103
1104WORKDIR /app
1105COPY --from=builder /app/target/release/{name} /app/{name}
1106
1107ENV RUST_LOG=info
1108
1109EXPOSE 8081
1110
1111CMD ["/app/{name}"]
1112"#,
1113        name = name,
1114    )
1115}
1116
1117#[cfg(test)]
1118mod tests {
1119    use super::*;
1120
1121    #[test]
1122    fn test_to_pascal_case() {
1123        assert_eq!(to_pascal_case("order-processor"), "OrderProcessor");
1124        assert_eq!(to_pascal_case("user_handler"), "UserHandler");
1125        assert_eq!(to_pascal_case("simple"), "Simple");
1126    }
1127}