Skip to main content

github_bot_sdk/events/
processor.rs

1//! Event processor for converting raw webhooks to normalized events.
2
3use serde_json::Value;
4
5use crate::client::Repository;
6use crate::error::EventError;
7
8use super::{EntityType, EventEnvelope, EventId, EventMetadata, EventPayload};
9
10/// Event processor configuration.
11///
12/// Controls how webhook events are processed, validated, and normalized.
13#[derive(Debug, Clone)]
14pub struct ProcessorConfig {
15    /// Enable webhook signature validation
16    pub enable_signature_validation: bool,
17
18    /// Enable session correlation for ordered processing
19    pub enable_session_correlation: bool,
20
21    /// Strategy for generating session IDs
22    pub session_id_strategy: SessionIdStrategy,
23
24    /// Maximum allowed payload size in bytes
25    pub max_payload_size: usize,
26
27    /// Trace sampling rate (0.0 to 1.0)
28    pub trace_sampling_rate: f64,
29}
30
31impl Default for ProcessorConfig {
32    fn default() -> Self {
33        Self {
34            enable_signature_validation: true,
35            enable_session_correlation: true,
36            session_id_strategy: SessionIdStrategy::Entity,
37            max_payload_size: 1024 * 1024, // 1MB
38            trace_sampling_rate: 0.1,
39        }
40    }
41}
42
43/// Strategy for generating session IDs for ordered processing.
44#[derive(Debug, Clone)]
45pub enum SessionIdStrategy {
46    /// No session IDs generated
47    None,
48
49    /// Entity-based session IDs (e.g., "pr-owner/repo-123")
50    Entity,
51
52    /// Repository-based session IDs (e.g., "repo-owner/name")
53    Repository,
54
55    /// Custom session ID generation function
56    Custom(fn(&EventEnvelope) -> Option<String>),
57}
58
59/// Processes raw GitHub webhooks into normalized event envelopes.
60///
61/// The event processor handles:
62/// - Signature validation (optional)
63/// - JSON parsing and validation
64/// - Entity extraction and classification
65/// - Session ID generation for ordering
66/// - Metadata population
67///
68/// # Examples
69///
70/// ```rust,no_run
71/// use github_bot_sdk::events::{EventProcessor, ProcessorConfig};
72///
73/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
74/// let config = ProcessorConfig::default();
75/// let processor = EventProcessor::new(config);
76///
77/// let envelope = processor.process_webhook(
78///     "pull_request",
79///     b"{\"action\":\"opened\",\"number\":1}",
80///     Some("delivery-123"),
81/// ).await?;
82///
83/// println!("Processed event: {}", envelope.event_id);
84/// # Ok(())
85/// # }
86/// ```
87pub struct EventProcessor {
88    config: ProcessorConfig,
89}
90
91impl EventProcessor {
92    /// Create a new event processor with the given configuration.
93    pub fn new(config: ProcessorConfig) -> Self {
94        Self { config }
95    }
96
97    /// Process a raw webhook into a normalized event envelope.
98    ///
99    /// # Arguments
100    ///
101    /// * `event_type` - GitHub event type (from X-GitHub-Event header)
102    /// * `payload` - Raw webhook payload bytes
103    /// * `delivery_id` - GitHub delivery ID (from X-GitHub-Delivery header)
104    ///
105    /// # Returns
106    ///
107    /// A normalized `EventEnvelope` or an error if processing fails.
108    ///
109    /// # Errors
110    ///
111    /// Returns `EventError` if:
112    /// - Payload exceeds maximum size
113    /// - Payload is not valid JSON
114    /// - Required fields are missing
115    /// - Event type is unsupported
116    pub async fn process_webhook(
117        &self,
118        event_type: &str,
119        payload: &[u8],
120        delivery_id: Option<&str>,
121    ) -> Result<EventEnvelope, EventError> {
122        // Check payload size
123        if payload.len() > self.config.max_payload_size {
124            return Err(EventError::PayloadTooLarge {
125                size: payload.len(),
126                max: self.config.max_payload_size,
127            });
128        }
129
130        // Parse JSON payload
131        let json_payload: Value = serde_json::from_slice(payload)?;
132
133        // Extract repository information
134        let repository =
135            json_payload
136                .get("repository")
137                .ok_or_else(|| EventError::MissingField {
138                    field: "repository".to_string(),
139                })?;
140
141        let repository: Repository = serde_json::from_value(repository.clone())?;
142
143        // Extract entity information
144        let (entity_type, entity_id) = self.extract_entity_info(event_type, &json_payload)?;
145
146        // Create event payload wrapper
147        let event_payload = EventPayload::new(json_payload);
148
149        // Create event ID
150        let event_id = if let Some(delivery_id) = delivery_id {
151            EventId::from_github_delivery(delivery_id)
152        } else {
153            EventId::new()
154        };
155
156        // Create metadata
157        let metadata = EventMetadata {
158            delivery_id: delivery_id.map(|s| s.to_string()),
159            signature_valid: !self.config.enable_signature_validation, // Default if not validated
160            ..Default::default()
161        };
162
163        // Create envelope
164        let mut envelope = EventEnvelope {
165            event_id,
166            event_type: event_type.to_string(),
167            repository,
168            entity_type,
169            entity_id: entity_id.clone(),
170            session_id: None,
171            payload: event_payload,
172            metadata,
173            trace_context: None,
174        };
175
176        // Generate session ID if enabled
177        if self.config.enable_session_correlation {
178            let session_id =
179                self.generate_session_id(&envelope.entity_type, &entity_id, &envelope.repository);
180            envelope.session_id = session_id;
181        }
182
183        Ok(envelope)
184    }
185
186    /// Extract entity information from the payload.
187    ///
188    /// Determines the primary entity type and ID for session correlation.
189    pub fn extract_entity_info(
190        &self,
191        event_type: &str,
192        payload: &Value,
193    ) -> Result<(EntityType, Option<String>), EventError> {
194        let entity_type = EntityType::from_event_type(event_type);
195
196        // Extract entity ID based on event type
197        let entity_id = match event_type {
198            "pull_request" | "pull_request_review" | "pull_request_review_comment" => {
199                // For PR events, extract PR number
200                payload
201                    .get("number")
202                    .or_else(|| payload.get("pull_request").and_then(|pr| pr.get("number")))
203                    .and_then(|v| v.as_u64())
204                    .map(|n| n.to_string())
205            }
206            "issues" | "issue_comment" => {
207                // For issue events, extract issue number
208                payload
209                    .get("issue")
210                    .and_then(|issue| issue.get("number"))
211                    .and_then(|v| v.as_u64())
212                    .map(|n| n.to_string())
213            }
214            "push" | "create" | "delete" => {
215                // For branch events, extract ref name
216                payload
217                    .get("ref")
218                    .and_then(|v| v.as_str())
219                    .map(|r| r.to_string())
220            }
221            "check_run" => {
222                // For check run events, extract check run ID
223                payload
224                    .get("check_run")
225                    .and_then(|cr| cr.get("id"))
226                    .and_then(|v| v.as_u64())
227                    .map(|n| n.to_string())
228            }
229            "check_suite" => {
230                // For check suite events, extract check suite ID
231                payload
232                    .get("check_suite")
233                    .and_then(|cs| cs.get("id"))
234                    .and_then(|v| v.as_u64())
235                    .map(|n| n.to_string())
236            }
237            "release" => {
238                // For release events, extract release ID
239                payload
240                    .get("release")
241                    .and_then(|r| r.get("id"))
242                    .and_then(|v| v.as_u64())
243                    .map(|n| n.to_string())
244            }
245            "deployment" | "deployment_status" => {
246                // For deployment events, extract deployment ID
247                payload
248                    .get("deployment")
249                    .and_then(|d| d.get("id"))
250                    .and_then(|v| v.as_u64())
251                    .map(|n| n.to_string())
252            }
253            _ => {
254                // For other events, no specific entity ID
255                None
256            }
257        };
258
259        Ok((entity_type, entity_id))
260    }
261
262    /// Generate a session ID for ordered processing.
263    ///
264    /// Uses the configured strategy to create session IDs that group
265    /// related events together for sequential processing.
266    pub fn generate_session_id(
267        &self,
268        entity_type: &EntityType,
269        entity_id: &Option<String>,
270        repository: &Repository,
271    ) -> Option<String> {
272        match &self.config.session_id_strategy {
273            SessionIdStrategy::None => None,
274            SessionIdStrategy::Entity => {
275                // Generate entity-based session ID
276                if let Some(id) = entity_id {
277                    match entity_type {
278                        EntityType::PullRequest => {
279                            Some(format!("pr-{}-{}", repository.full_name, id))
280                        }
281                        EntityType::Issue => Some(format!("issue-{}-{}", repository.full_name, id)),
282                        EntityType::Branch => {
283                            Some(format!("branch-{}-{}", repository.full_name, id))
284                        }
285                        EntityType::CheckRun => {
286                            Some(format!("check-run-{}-{}", repository.full_name, id))
287                        }
288                        EntityType::CheckSuite => {
289                            Some(format!("check-suite-{}-{}", repository.full_name, id))
290                        }
291                        EntityType::Release => {
292                            Some(format!("release-{}-{}", repository.full_name, id))
293                        }
294                        EntityType::Deployment => {
295                            Some(format!("deployment-{}-{}", repository.full_name, id))
296                        }
297                        _ => None,
298                    }
299                } else {
300                    None
301                }
302            }
303            SessionIdStrategy::Repository => {
304                // Generate repository-based session ID
305                Some(format!("repo-{}", repository.full_name))
306            }
307            SessionIdStrategy::Custom(f) => {
308                // Use custom function - create temporary envelope for evaluation
309                let temp_envelope = EventEnvelope {
310                    event_id: EventId::new(),
311                    event_type: String::new(),
312                    repository: repository.clone(),
313                    entity_type: entity_type.clone(),
314                    entity_id: entity_id.clone(),
315                    session_id: None,
316                    payload: EventPayload::new(serde_json::Value::Null),
317                    metadata: EventMetadata::default(),
318                    trace_context: None,
319                };
320                f(&temp_envelope)
321            }
322        }
323    }
324}
325
326#[cfg(test)]
327#[path = "processor_tests.rs"]
328mod tests;