allframe_forge/templates/
consumer.rs

1//! Consumer archetype templates
2//!
3//! Templates for generating event consumer services with Kafka,
4//! idempotency, dead letter queues, and resilience patterns.
5
6use crate::config::{MessageBroker, ProjectConfig};
7
8/// Convert a string to PascalCase
9fn to_pascal_case(s: &str) -> String {
10    s.split(|c| c == '-' || c == '_')
11        .map(|word| {
12            let mut chars = word.chars();
13            match chars.next() {
14                None => String::new(),
15                Some(first) => first.to_uppercase().chain(chars).collect(),
16            }
17        })
18        .collect()
19}
20
21/// Generate Cargo.toml for consumer project
22pub fn cargo_toml(config: &ProjectConfig) -> String {
23    let consumer = config.consumer.as_ref().unwrap();
24    let name = &config.name;
25
26    let broker_deps = match consumer.broker {
27        MessageBroker::Kafka => r#"rdkafka = { version = "0.36", features = ["cmake-build"] }"#,
28        MessageBroker::RabbitMq => r#"lapin = "2.3""#,
29        MessageBroker::Redis => r#"redis = { version = "0.25", features = ["tokio-comp", "streams"] }"#,
30        MessageBroker::Sqs => r#"aws-sdk-sqs = "1.0""#,
31        MessageBroker::PubSub => r#"google-cloud-pubsub = "0.25""#,
32    };
33
34    format!(
35        r#"[package]
36name = "{name}"
37version = "0.1.0"
38edition = "2021"
39rust-version = "1.75"
40description = "{display_name}"
41
42[dependencies]
43# AllFrame
44allframe-core = {{ version = "0.1", features = ["resilience", "otel"] }}
45
46# Message Broker
47{broker_deps}
48
49# Async
50tokio = {{ version = "1", features = ["full"] }}
51async-trait = "0.1"
52futures = "0.3"
53
54# Serialization
55serde = {{ version = "1.0", features = ["derive"] }}
56serde_json = "1.0"
57
58# Error handling
59thiserror = "2.0"
60anyhow = "1.0"
61
62# Tracing & Metrics
63tracing = "0.1"
64tracing-subscriber = {{ version = "0.3", features = ["env-filter"] }}
65opentelemetry = {{ version = "0.27", features = ["metrics"] }}
66
67# Utilities
68chrono = {{ version = "0.4", features = ["serde"] }}
69uuid = {{ version = "1.0", features = ["v4", "serde"] }}
70dotenvy = "0.15"
71
72# Health checks
73axum = "0.7"
74
75[dev-dependencies]
76tokio-test = "0.4"
77mockall = "0.13"
78
79[[bin]]
80name = "{name}"
81path = "src/main.rs"
82"#,
83        name = name,
84        display_name = consumer.display_name,
85        broker_deps = broker_deps,
86    )
87}
88
89/// Generate main.rs
90pub fn main_rs(config: &ProjectConfig) -> String {
91    let consumer = config.consumer.as_ref().unwrap();
92    let pascal_name = to_pascal_case(&consumer.service_name);
93
94    format!(
95        r#"//! {display_name}
96//!
97//! An event consumer service with idempotency, retry, and dead letter queue handling.
98
99use std::sync::Arc;
100use tracing::info;
101
102mod config;
103mod error;
104mod domain;
105mod application;
106mod infrastructure;
107
108use config::Config;
109use application::{pascal_name}Consumer;
110use infrastructure::{{
111    KafkaMessageBroker,
112    InMemoryIdempotencyStore,
113    HealthServer,
114}};
115
116#[tokio::main]
117async fn main() -> anyhow::Result<()> {{
118    // Load environment variables
119    dotenvy::dotenv().ok();
120
121    // Initialize tracing
122    tracing_subscriber::fmt()
123        .with_env_filter(
124            tracing_subscriber::EnvFilter::from_default_env()
125                .add_directive(tracing::Level::INFO.into()),
126        )
127        .init();
128
129    // Load configuration
130    let config = Config::from_env();
131    info!("Starting {display_name}");
132    info!("Broker: {{}}", config.broker.brokers);
133    info!("Topics: {{:?}}", config.topics);
134
135    // Create idempotency store
136    let idempotency_store = Arc::new(InMemoryIdempotencyStore::new());
137
138    // Create message broker
139    let broker = KafkaMessageBroker::new(&config.broker).await?;
140
141    // Create consumer
142    let consumer = {pascal_name}Consumer::new(
143        broker,
144        idempotency_store,
145        config.retry.clone(),
146        config.dlq.clone(),
147    );
148
149    // Start health server in background
150    let health_handle = tokio::spawn(async move {{
151        let health_server = HealthServer::new(config.server.health_port);
152        health_server.run().await
153    }});
154
155    // Run consumer
156    info!("Consumer started, waiting for messages...");
157    consumer.run(&config.topics).await?;
158
159    health_handle.abort();
160    info!("Consumer shutdown complete");
161    Ok(())
162}}
163"#,
164        display_name = consumer.display_name,
165        pascal_name = pascal_name,
166    )
167}
168
169/// Generate config.rs
170pub fn config_rs(config: &ProjectConfig) -> String {
171    let consumer = config.consumer.as_ref().unwrap();
172    let service_name = &consumer.service_name;
173    let upper_name = service_name.to_uppercase().replace('-', "_");
174
175    format!(
176        r#"//! Configuration module
177//!
178//! Loads configuration from environment variables.
179
180use std::time::Duration;
181
182/// Main configuration
183#[derive(Debug, Clone)]
184pub struct Config {{
185    pub broker: BrokerConfig,
186    pub topics: Vec<String>,
187    pub group_id: String,
188    pub retry: RetryConfig,
189    pub dlq: DlqConfig,
190    pub server: ServerConfig,
191}}
192
193/// Message broker configuration
194#[derive(Debug, Clone)]
195pub struct BrokerConfig {{
196    pub brokers: String,
197    pub security_protocol: String,
198    pub sasl_mechanism: Option<String>,
199    pub sasl_username: Option<String>,
200    pub sasl_password: Option<String>,
201}}
202
203/// Retry configuration
204#[derive(Debug, Clone)]
205pub struct RetryConfig {{
206    pub max_attempts: u32,
207    pub initial_backoff: Duration,
208    pub max_backoff: Duration,
209    pub multiplier: f64,
210}}
211
212/// Dead Letter Queue configuration
213#[derive(Debug, Clone)]
214pub struct DlqConfig {{
215    pub enabled: bool,
216    pub suffix: String,
217}}
218
219/// Server configuration
220#[derive(Debug, Clone)]
221pub struct ServerConfig {{
222    pub health_port: u16,
223    pub metrics_port: u16,
224}}
225
226impl Config {{
227    pub fn from_env() -> Self {{
228        Self {{
229            broker: BrokerConfig {{
230                brokers: std::env::var("{upper_name}_BROKERS")
231                    .unwrap_or_else(|_| "localhost:9092".to_string()),
232                security_protocol: std::env::var("{upper_name}_SECURITY_PROTOCOL")
233                    .unwrap_or_else(|_| "PLAINTEXT".to_string()),
234                sasl_mechanism: std::env::var("{upper_name}_SASL_MECHANISM").ok(),
235                sasl_username: std::env::var("{upper_name}_SASL_USERNAME").ok(),
236                sasl_password: std::env::var("{upper_name}_SASL_PASSWORD").ok(),
237            }},
238            topics: std::env::var("{upper_name}_TOPICS")
239                .unwrap_or_else(|_| "events".to_string())
240                .split(',')
241                .map(|s| s.trim().to_string())
242                .collect(),
243            group_id: std::env::var("{upper_name}_GROUP_ID")
244                .unwrap_or_else(|_| "{service_name}-group".to_string()),
245            retry: RetryConfig {{
246                max_attempts: std::env::var("{upper_name}_RETRY_MAX_ATTEMPTS")
247                    .ok()
248                    .and_then(|v| v.parse().ok())
249                    .unwrap_or({max_attempts}),
250                initial_backoff: Duration::from_millis(
251                    std::env::var("{upper_name}_RETRY_INITIAL_BACKOFF_MS")
252                        .ok()
253                        .and_then(|v| v.parse().ok())
254                        .unwrap_or({initial_backoff_ms}),
255                ),
256                max_backoff: Duration::from_millis(
257                    std::env::var("{upper_name}_RETRY_MAX_BACKOFF_MS")
258                        .ok()
259                        .and_then(|v| v.parse().ok())
260                        .unwrap_or({max_backoff_ms}),
261                ),
262                multiplier: std::env::var("{upper_name}_RETRY_MULTIPLIER")
263                    .ok()
264                    .and_then(|v| v.parse().ok())
265                    .unwrap_or({multiplier}),
266            }},
267            dlq: DlqConfig {{
268                enabled: std::env::var("{upper_name}_DLQ_ENABLED")
269                    .ok()
270                    .and_then(|v| v.parse().ok())
271                    .unwrap_or(true),
272                suffix: std::env::var("{upper_name}_DLQ_SUFFIX")
273                    .unwrap_or_else(|_| ".dlq".to_string()),
274            }},
275            server: ServerConfig {{
276                health_port: std::env::var("{upper_name}_HEALTH_PORT")
277                    .ok()
278                    .and_then(|v| v.parse().ok())
279                    .unwrap_or({health_port}),
280                metrics_port: std::env::var("{upper_name}_METRICS_PORT")
281                    .ok()
282                    .and_then(|v| v.parse().ok())
283                    .unwrap_or({metrics_port}),
284            }},
285        }}
286    }}
287}}
288"#,
289        upper_name = upper_name,
290        service_name = service_name,
291        max_attempts = consumer.retry.max_attempts,
292        initial_backoff_ms = consumer.retry.initial_backoff_ms,
293        max_backoff_ms = consumer.retry.max_backoff_ms,
294        multiplier = consumer.retry.multiplier,
295        health_port = consumer.server.health_port,
296        metrics_port = consumer.server.metrics_port,
297    )
298}
299
300/// Generate error.rs
301pub fn error_rs(config: &ProjectConfig) -> String {
302    let consumer = config.consumer.as_ref().unwrap();
303    let pascal_name = to_pascal_case(&consumer.service_name);
304
305    format!(
306        r#"//! Error types for the consumer service
307
308use thiserror::Error;
309
310/// Consumer error type
311#[derive(Error, Debug)]
312pub enum {pascal_name}Error {{
313    #[error("Broker error: {{0}}")]
314    BrokerError(String),
315
316    #[error("Deserialization error: {{0}}")]
317    DeserializationError(String),
318
319    #[error("Handler error: {{0}}")]
320    HandlerError(String),
321
322    #[error("Idempotency error: {{0}}")]
323    IdempotencyError(String),
324
325    #[error("DLQ error: {{0}}")]
326    DlqError(String),
327
328    #[error("Configuration error: {{0}}")]
329    ConfigError(String),
330}}
331
332/// Result type alias
333pub type Result<T> = std::result::Result<T, {pascal_name}Error>;
334
335impl From<{pascal_name}Error> for tonic::Status {{
336    fn from(e: {pascal_name}Error) -> Self {{
337        tonic::Status::internal(e.to_string())
338    }}
339}}
340"#,
341        pascal_name = pascal_name,
342    )
343}
344
345/// Generate domain/mod.rs
346pub fn domain_mod(_config: &ProjectConfig) -> String {
347    r#"//! Domain layer - Events and handlers
348
349pub mod events;
350pub mod handlers;
351
352pub use events::*;
353pub use handlers::*;
354"#
355    .to_string()
356}
357
358/// Generate domain/events.rs
359pub fn domain_events(_config: &ProjectConfig) -> String {
360    r#"//! Domain events
361
362use serde::{Deserialize, Serialize};
363use chrono::{DateTime, Utc};
364use uuid::Uuid;
365
366/// Base event envelope
367#[derive(Debug, Clone, Serialize, Deserialize)]
368pub struct EventEnvelope<T> {
369    /// Unique event ID (used for idempotency)
370    pub id: Uuid,
371    /// Event type name
372    pub event_type: String,
373    /// Event timestamp
374    pub timestamp: DateTime<Utc>,
375    /// Correlation ID for tracing
376    pub correlation_id: Option<Uuid>,
377    /// Event payload
378    pub payload: T,
379    /// Event metadata
380    pub metadata: EventMetadata,
381}
382
383/// Event metadata
384#[derive(Debug, Clone, Default, Serialize, Deserialize)]
385pub struct EventMetadata {
386    /// Source service
387    pub source: Option<String>,
388    /// Event version
389    pub version: Option<String>,
390    /// Retry count
391    pub retry_count: u32,
392}
393
394impl<T> EventEnvelope<T> {
395    /// Create a new event envelope
396    pub fn new(event_type: impl Into<String>, payload: T) -> Self {
397        Self {
398            id: Uuid::new_v4(),
399            event_type: event_type.into(),
400            timestamp: Utc::now(),
401            correlation_id: None,
402            payload,
403            metadata: EventMetadata::default(),
404        }
405    }
406
407    /// Set correlation ID
408    pub fn with_correlation_id(mut self, id: Uuid) -> Self {
409        self.correlation_id = Some(id);
410        self
411    }
412}
413
414/// Example domain event: User Created
415#[derive(Debug, Clone, Serialize, Deserialize)]
416pub struct UserCreated {
417    pub user_id: Uuid,
418    pub email: String,
419    pub name: String,
420}
421
422/// Example domain event: Order Placed
423#[derive(Debug, Clone, Serialize, Deserialize)]
424pub struct OrderPlaced {
425    pub order_id: Uuid,
426    pub user_id: Uuid,
427    pub total: f64,
428    pub items: Vec<OrderItem>,
429}
430
431/// Order item
432#[derive(Debug, Clone, Serialize, Deserialize)]
433pub struct OrderItem {
434    pub product_id: Uuid,
435    pub quantity: u32,
436    pub price: f64,
437}
438"#
439    .to_string()
440}
441
442/// Generate domain/handlers.rs
443pub fn domain_handlers(config: &ProjectConfig) -> String {
444    let consumer = config.consumer.as_ref().unwrap();
445    let pascal_name = to_pascal_case(&consumer.service_name);
446
447    format!(
448        r#"//! Event handlers
449
450use async_trait::async_trait;
451use tracing::{{info, warn}};
452
453use crate::domain::events::*;
454use crate::error::Result;
455
456/// Event handler trait
457#[async_trait]
458pub trait EventHandler<E>: Send + Sync {{
459    /// Handle an event
460    async fn handle(&self, event: EventEnvelope<E>) -> Result<()>;
461}}
462
463/// User created event handler
464pub struct UserCreatedHandler;
465
466#[async_trait]
467impl EventHandler<UserCreated> for UserCreatedHandler {{
468    async fn handle(&self, event: EventEnvelope<UserCreated>) -> Result<()> {{
469        info!(
470            user_id = %event.payload.user_id,
471            email = %event.payload.email,
472            "Processing UserCreated event"
473        );
474
475        // TODO: Implement your business logic here
476        // Examples:
477        // - Send welcome email
478        // - Create user profile
479        // - Initialize user preferences
480
481        Ok(())
482    }}
483}}
484
485/// Order placed event handler
486pub struct OrderPlacedHandler;
487
488#[async_trait]
489impl EventHandler<OrderPlaced> for OrderPlacedHandler {{
490    async fn handle(&self, event: EventEnvelope<OrderPlaced>) -> Result<()> {{
491        info!(
492            order_id = %event.payload.order_id,
493            user_id = %event.payload.user_id,
494            total = %event.payload.total,
495            "Processing OrderPlaced event"
496        );
497
498        // TODO: Implement your business logic here
499        // Examples:
500        // - Reserve inventory
501        // - Process payment
502        // - Send confirmation email
503
504        Ok(())
505    }}
506}}
507
508/// Handler registry for routing events to handlers
509pub struct {pascal_name}HandlerRegistry {{
510    user_created_handler: UserCreatedHandler,
511    order_placed_handler: OrderPlacedHandler,
512}}
513
514impl Default for {pascal_name}HandlerRegistry {{
515    fn default() -> Self {{
516        Self::new()
517    }}
518}}
519
520impl {pascal_name}HandlerRegistry {{
521    pub fn new() -> Self {{
522        Self {{
523            user_created_handler: UserCreatedHandler,
524            order_placed_handler: OrderPlacedHandler,
525        }}
526    }}
527
528    /// Route and handle an event based on its type
529    pub async fn handle_event(&self, event_type: &str, payload: &[u8]) -> Result<()> {{
530        match event_type {{
531            "UserCreated" => {{
532                let event: EventEnvelope<UserCreated> = serde_json::from_slice(payload)
533                    .map_err(|e| crate::error::{pascal_name}Error::DeserializationError(e.to_string()))?;
534                self.user_created_handler.handle(event).await
535            }}
536            "OrderPlaced" => {{
537                let event: EventEnvelope<OrderPlaced> = serde_json::from_slice(payload)
538                    .map_err(|e| crate::error::{pascal_name}Error::DeserializationError(e.to_string()))?;
539                self.order_placed_handler.handle(event).await
540            }}
541            _ => {{
542                warn!(event_type = %event_type, "Unknown event type, skipping");
543                Ok(())
544            }}
545        }}
546    }}
547}}
548"#,
549        pascal_name = pascal_name,
550    )
551}
552
553/// Generate application/mod.rs
554pub fn application_mod(_config: &ProjectConfig) -> String {
555    r#"//! Application layer - Consumer orchestration
556
557pub mod consumer;
558
559pub use consumer::*;
560"#
561    .to_string()
562}
563
564/// Generate application/consumer.rs
565pub fn application_consumer(config: &ProjectConfig) -> String {
566    let consumer = config.consumer.as_ref().unwrap();
567    let pascal_name = to_pascal_case(&consumer.service_name);
568
569    format!(
570        r#"//! Main consumer implementation
571
572use std::sync::Arc;
573use std::time::Duration;
574use async_trait::async_trait;
575use tracing::{{info, warn, error, instrument}};
576
577use crate::config::{{RetryConfig, DlqConfig}};
578use crate::domain::{pascal_name}HandlerRegistry;
579use crate::error::{{Result, {pascal_name}Error}};
580use crate::infrastructure::{{MessageBroker, IdempotencyStore}};
581
582/// Message from the broker
583#[derive(Debug, Clone)]
584pub struct Message {{
585    /// Message key
586    pub key: Option<String>,
587    /// Message payload
588    pub payload: Vec<u8>,
589    /// Message headers
590    pub headers: Vec<(String, String)>,
591    /// Topic
592    pub topic: String,
593    /// Partition
594    pub partition: i32,
595    /// Offset
596    pub offset: i64,
597}}
598
599impl Message {{
600    /// Get the event ID from headers
601    pub fn event_id(&self) -> Option<String> {{
602        self.headers
603            .iter()
604            .find(|(k, _)| k == "event_id")
605            .map(|(_, v)| v.clone())
606    }}
607
608    /// Get the event type from headers
609    pub fn event_type(&self) -> Option<String> {{
610        self.headers
611            .iter()
612            .find(|(k, _)| k == "event_type")
613            .map(|(_, v)| v.clone())
614    }}
615}}
616
617/// Main consumer service
618pub struct {pascal_name}Consumer<B: MessageBroker, I: IdempotencyStore> {{
619    broker: B,
620    idempotency_store: Arc<I>,
621    handler_registry: {pascal_name}HandlerRegistry,
622    retry_config: RetryConfig,
623    dlq_config: DlqConfig,
624}}
625
626impl<B: MessageBroker, I: IdempotencyStore> {pascal_name}Consumer<B, I> {{
627    pub fn new(
628        broker: B,
629        idempotency_store: Arc<I>,
630        retry_config: RetryConfig,
631        dlq_config: DlqConfig,
632    ) -> Self {{
633        Self {{
634            broker,
635            idempotency_store,
636            handler_registry: {pascal_name}HandlerRegistry::new(),
637            retry_config,
638            dlq_config,
639        }}
640    }}
641
642    /// Run the consumer loop
643    pub async fn run(&self, topics: &[String]) -> Result<()> {{
644        info!("Subscribing to topics: {{:?}}", topics);
645        self.broker.subscribe(topics).await?;
646
647        loop {{
648            match self.broker.poll(Duration::from_secs(1)).await {{
649                Ok(Some(message)) => {{
650                    if let Err(e) = self.process_message(message).await {{
651                        error!("Error processing message: {{}}", e);
652                    }}
653                }}
654                Ok(None) => {{
655                    // No message available, continue polling
656                }}
657                Err(e) => {{
658                    error!("Error polling for messages: {{}}", e);
659                    tokio::time::sleep(Duration::from_secs(1)).await;
660                }}
661            }}
662        }}
663    }}
664
665    #[instrument(skip(self, message), fields(topic = %message.topic, partition = %message.partition, offset = %message.offset))]
666    async fn process_message(&self, message: Message) -> Result<()> {{
667        let event_id = message.event_id().unwrap_or_else(|| {{
668            format!("{{}}:{{}}:{{}}", message.topic, message.partition, message.offset)
669        }});
670
671        // Check idempotency
672        if self.idempotency_store.exists(&event_id).await? {{
673            info!(event_id = %event_id, "Event already processed, skipping");
674            self.broker.commit(&message).await?;
675            return Ok(());
676        }}
677
678        let event_type = message.event_type().unwrap_or_else(|| "Unknown".to_string());
679
680        // Process with retry
681        let result = self.process_with_retry(&event_type, &message.payload).await;
682
683        match result {{
684            Ok(()) => {{
685                // Mark as processed
686                self.idempotency_store.mark_processed(&event_id).await?;
687                self.broker.commit(&message).await?;
688                info!(event_id = %event_id, "Event processed successfully");
689            }}
690            Err(e) => {{
691                error!(event_id = %event_id, error = %e, "Failed to process event");
692
693                if self.dlq_config.enabled {{
694                    // Send to DLQ
695                    let dlq_topic = format!("{{}}{{}}", message.topic, self.dlq_config.suffix);
696                    self.broker.send_to_dlq(&dlq_topic, &message).await?;
697                    self.broker.commit(&message).await?;
698                    warn!(event_id = %event_id, dlq_topic = %dlq_topic, "Event sent to DLQ");
699                }} else {{
700                    return Err(e);
701                }}
702            }}
703        }}
704
705        Ok(())
706    }}
707
708    async fn process_with_retry(&self, event_type: &str, payload: &[u8]) -> Result<()> {{
709        let mut attempts = 0;
710        let mut backoff = self.retry_config.initial_backoff;
711
712        loop {{
713            attempts += 1;
714
715            match self.handler_registry.handle_event(event_type, payload).await {{
716                Ok(()) => return Ok(()),
717                Err(e) if attempts >= self.retry_config.max_attempts => {{
718                    return Err(e);
719                }}
720                Err(e) => {{
721                    warn!(
722                        attempt = attempts,
723                        max_attempts = self.retry_config.max_attempts,
724                        error = %e,
725                        "Handler failed, retrying"
726                    );
727
728                    tokio::time::sleep(backoff).await;
729                    backoff = std::cmp::min(
730                        Duration::from_secs_f64(backoff.as_secs_f64() * self.retry_config.multiplier),
731                        self.retry_config.max_backoff,
732                    );
733                }}
734            }}
735        }}
736    }}
737}}
738"#,
739        pascal_name = pascal_name,
740    )
741}
742
743/// Generate infrastructure/mod.rs
744pub fn infrastructure_mod(_config: &ProjectConfig) -> String {
745    r#"//! Infrastructure layer - Message brokers and stores
746
747mod broker;
748mod idempotency;
749mod health;
750
751pub use broker::*;
752pub use idempotency::*;
753pub use health::*;
754"#
755    .to_string()
756}
757
758/// Generate infrastructure/broker.rs
759pub fn infrastructure_broker(config: &ProjectConfig) -> String {
760    let consumer = config.consumer.as_ref().unwrap();
761    let pascal_name = to_pascal_case(&consumer.service_name);
762
763    format!(
764        r#"//! Message broker implementations
765
766use std::time::Duration;
767use async_trait::async_trait;
768use tracing::{{info, debug}};
769
770use crate::config::BrokerConfig;
771use crate::application::Message;
772use crate::error::{{Result, {pascal_name}Error}};
773
774/// Message broker trait
775#[async_trait]
776pub trait MessageBroker: Send + Sync {{
777    /// Subscribe to topics
778    async fn subscribe(&self, topics: &[String]) -> Result<()>;
779
780    /// Poll for messages
781    async fn poll(&self, timeout: Duration) -> Result<Option<Message>>;
782
783    /// Commit message offset
784    async fn commit(&self, message: &Message) -> Result<()>;
785
786    /// Send message to DLQ
787    async fn send_to_dlq(&self, dlq_topic: &str, message: &Message) -> Result<()>;
788}}
789
790/// Kafka message broker implementation
791pub struct KafkaMessageBroker {{
792    // In a real implementation, this would hold the rdkafka consumer
793    _config: BrokerConfig,
794}}
795
796impl KafkaMessageBroker {{
797    pub async fn new(config: &BrokerConfig) -> Result<Self> {{
798        info!("Connecting to Kafka brokers: {{}}", config.brokers);
799
800        // In a real implementation:
801        // let consumer: StreamConsumer = ClientConfig::new()
802        //     .set("bootstrap.servers", &config.brokers)
803        //     .set("group.id", &group_id)
804        //     .set("enable.partition.eof", "false")
805        //     .set("session.timeout.ms", "6000")
806        //     .set("enable.auto.commit", "false")
807        //     .create()?;
808
809        Ok(Self {{
810            _config: config.clone(),
811        }})
812    }}
813}}
814
815#[async_trait]
816impl MessageBroker for KafkaMessageBroker {{
817    async fn subscribe(&self, topics: &[String]) -> Result<()> {{
818        info!("Subscribing to topics: {{:?}}", topics);
819        // In a real implementation:
820        // self.consumer.subscribe(&topics.iter().map(|s| s.as_str()).collect::<Vec<_>>())?;
821        Ok(())
822    }}
823
824    async fn poll(&self, timeout: Duration) -> Result<Option<Message>> {{
825        debug!("Polling for messages with timeout: {{:?}}", timeout);
826
827        // In a real implementation:
828        // match self.consumer.poll(timeout) {{
829        //     Some(Ok(msg)) => {{
830        //         let headers = msg.headers()
831        //             .map(|h| {{
832        //                 (0..h.count())
833        //                     .filter_map(|i| {{
834        //                         h.get(i).map(|header| {{
835        //                             (header.key.to_string(), String::from_utf8_lossy(header.value.unwrap_or_default()).to_string())
836        //                         }})
837        //                     }})
838        //                     .collect()
839        //             }})
840        //             .unwrap_or_default();
841        //
842        //         Ok(Some(Message {{
843        //             key: msg.key().map(|k| String::from_utf8_lossy(k).to_string()),
844        //             payload: msg.payload().unwrap_or_default().to_vec(),
845        //             headers,
846        //             topic: msg.topic().to_string(),
847        //             partition: msg.partition(),
848        //             offset: msg.offset(),
849        //         }}))
850        //     }}
851        //     Some(Err(e)) => Err({pascal_name}Error::BrokerError(e.to_string())),
852        //     None => Ok(None),
853        // }}
854
855        // Placeholder: simulate no messages
856        tokio::time::sleep(timeout).await;
857        Ok(None)
858    }}
859
860    async fn commit(&self, message: &Message) -> Result<()> {{
861        debug!(
862            topic = %message.topic,
863            partition = %message.partition,
864            offset = %message.offset,
865            "Committing offset"
866        );
867        // In a real implementation:
868        // self.consumer.commit_message(&message, CommitMode::Async)?;
869        Ok(())
870    }}
871
872    async fn send_to_dlq(&self, dlq_topic: &str, message: &Message) -> Result<()> {{
873        info!(
874            dlq_topic = %dlq_topic,
875            original_topic = %message.topic,
876            "Sending message to DLQ"
877        );
878        // In a real implementation, use a producer to send to DLQ
879        Ok(())
880    }}
881}}
882"#,
883        pascal_name = pascal_name,
884    )
885}
886
887/// Generate infrastructure/idempotency.rs
888pub fn infrastructure_idempotency(config: &ProjectConfig) -> String {
889    let consumer = config.consumer.as_ref().unwrap();
890    let pascal_name = to_pascal_case(&consumer.service_name);
891
892    format!(
893        r#"//! Idempotency store implementations
894
895use std::collections::HashSet;
896use std::sync::RwLock;
897use async_trait::async_trait;
898
899use crate::error::{{Result, {pascal_name}Error}};
900
901/// Idempotency store trait
902#[async_trait]
903pub trait IdempotencyStore: Send + Sync {{
904    /// Check if an event has already been processed
905    async fn exists(&self, event_id: &str) -> Result<bool>;
906
907    /// Mark an event as processed
908    async fn mark_processed(&self, event_id: &str) -> Result<()>;
909}}
910
911/// In-memory idempotency store (for development/testing)
912pub struct InMemoryIdempotencyStore {{
913    processed: RwLock<HashSet<String>>,
914}}
915
916impl InMemoryIdempotencyStore {{
917    pub fn new() -> Self {{
918        Self {{
919            processed: RwLock::new(HashSet::new()),
920        }}
921    }}
922}}
923
924impl Default for InMemoryIdempotencyStore {{
925    fn default() -> Self {{
926        Self::new()
927    }}
928}}
929
930#[async_trait]
931impl IdempotencyStore for InMemoryIdempotencyStore {{
932    async fn exists(&self, event_id: &str) -> Result<bool> {{
933        let guard = self.processed.read()
934            .map_err(|e| {pascal_name}Error::IdempotencyError(e.to_string()))?;
935        Ok(guard.contains(event_id))
936    }}
937
938    async fn mark_processed(&self, event_id: &str) -> Result<()> {{
939        let mut guard = self.processed.write()
940            .map_err(|e| {pascal_name}Error::IdempotencyError(e.to_string()))?;
941        guard.insert(event_id.to_string());
942        Ok(())
943    }}
944}}
945
946// TODO: Add Redis implementation
947// pub struct RedisIdempotencyStore {{ ... }}
948
949// TODO: Add PostgreSQL implementation
950// pub struct PostgresIdempotencyStore {{ ... }}
951"#,
952        pascal_name = pascal_name,
953    )
954}
955
956/// Generate infrastructure/health.rs
957pub fn infrastructure_health(_config: &ProjectConfig) -> String {
958    r#"//! Health check server
959
960use std::net::SocketAddr;
961use axum::{routing::get, Router, Json};
962use serde::Serialize;
963use tracing::info;
964
965/// Health response
966#[derive(Serialize)]
967pub struct HealthResponse {
968    pub status: String,
969    pub service: String,
970}
971
972/// Health check server
973pub struct HealthServer {
974    port: u16,
975}
976
977impl HealthServer {
978    pub fn new(port: u16) -> Self {
979        Self { port }
980    }
981
982    pub async fn run(&self) -> anyhow::Result<()> {
983        let app = Router::new()
984            .route("/health", get(health_handler))
985            .route("/ready", get(ready_handler));
986
987        let addr: SocketAddr = ([0, 0, 0, 0], self.port).into();
988        info!("Health server listening on {}", addr);
989
990        let listener = tokio::net::TcpListener::bind(addr).await?;
991        axum::serve(listener, app).await?;
992
993        Ok(())
994    }
995}
996
997async fn health_handler() -> Json<HealthResponse> {
998    Json(HealthResponse {
999        status: "healthy".to_string(),
1000        service: env!("CARGO_PKG_NAME").to_string(),
1001    })
1002}
1003
1004async fn ready_handler() -> Json<HealthResponse> {
1005    // TODO: Check actual readiness (broker connection, etc.)
1006    Json(HealthResponse {
1007        status: "ready".to_string(),
1008        service: env!("CARGO_PKG_NAME").to_string(),
1009    })
1010}
1011"#
1012    .to_string()
1013}
1014
1015/// Generate README.md
1016pub fn readme(config: &ProjectConfig) -> String {
1017    let consumer = config.consumer.as_ref().unwrap();
1018
1019    format!(
1020        r#"# {display_name}
1021
1022An event consumer service built with AllFrame.
1023
1024## Features
1025
1026- **{broker} Consumer**: Event-driven message processing
1027- **Idempotency**: Duplicate event detection
1028- **Dead Letter Queue**: Failed message handling
1029- **Retry with Backoff**: Configurable retry strategy
1030- **Health Checks**: Kubernetes-ready endpoints
1031
1032## Configuration
1033
1034Set the following environment variables:
1035
1036```bash
1037# Broker configuration
1038{upper_name}_BROKERS=localhost:9092
1039{upper_name}_GROUP_ID={service_name}-group
1040{upper_name}_TOPICS=events
1041
1042# Retry configuration
1043{upper_name}_RETRY_MAX_ATTEMPTS=3
1044{upper_name}_RETRY_INITIAL_BACKOFF_MS=100
1045{upper_name}_RETRY_MAX_BACKOFF_MS=10000
1046
1047# DLQ configuration
1048{upper_name}_DLQ_ENABLED=true
1049{upper_name}_DLQ_SUFFIX=.dlq
1050
1051# Server configuration
1052{upper_name}_HEALTH_PORT=8081
1053{upper_name}_METRICS_PORT=9090
1054```
1055
1056## Running
1057
1058```bash
1059cargo run
1060```
1061
1062## Health Endpoints
1063
1064- `GET /health` - Liveness check
1065- `GET /ready` - Readiness check
1066
1067## Adding Event Handlers
1068
10691. Define your event type in `src/domain/events.rs`
10702. Create a handler in `src/domain/handlers.rs`
10713. Register it in the `HandlerRegistry`
1072
1073## Generated by AllFrame Ignite
1074"#,
1075        display_name = consumer.display_name,
1076        broker = consumer.broker,
1077        service_name = consumer.service_name,
1078        upper_name = consumer.service_name.to_uppercase().replace('-', "_"),
1079    )
1080}
1081
1082/// Generate Dockerfile
1083pub fn dockerfile(config: &ProjectConfig) -> String {
1084    let name = &config.name;
1085
1086    format!(
1087        r#"# Build stage
1088FROM rust:1.86-slim as builder
1089
1090WORKDIR /app
1091COPY . .
1092RUN cargo build --release
1093
1094# Runtime stage
1095FROM debian:bookworm-slim
1096
1097RUN apt-get update && apt-get install -y \
1098    ca-certificates \
1099    libssl3 \
1100    && rm -rf /var/lib/apt/lists/*
1101
1102WORKDIR /app
1103COPY --from=builder /app/target/release/{name} /app/{name}
1104
1105ENV RUST_LOG=info
1106
1107EXPOSE 8081
1108
1109CMD ["/app/{name}"]
1110"#,
1111        name = name,
1112    )
1113}
1114
1115#[cfg(test)]
1116mod tests {
1117    use super::*;
1118
1119    #[test]
1120    fn test_to_pascal_case() {
1121        assert_eq!(to_pascal_case("order-processor"), "OrderProcessor");
1122        assert_eq!(to_pascal_case("user_handler"), "UserHandler");
1123        assert_eq!(to_pascal_case("simple"), "Simple");
1124    }
1125}