github_bot_sdk/events/processor.rs
1//! Event processor for converting raw webhooks to normalized events.
2
3use serde_json::Value;
4
5use crate::client::Repository;
6use crate::error::EventError;
7
8use super::{EntityType, EventEnvelope, EventId, EventMetadata, EventPayload};
9
10/// Event processor configuration.
11///
12/// Controls how webhook events are processed, validated, and normalized.
13#[derive(Debug, Clone)]
14pub struct ProcessorConfig {
15 /// Enable webhook signature validation
16 pub enable_signature_validation: bool,
17
18 /// Enable session correlation for ordered processing
19 pub enable_session_correlation: bool,
20
21 /// Strategy for generating session IDs
22 pub session_id_strategy: SessionIdStrategy,
23
24 /// Maximum allowed payload size in bytes
25 pub max_payload_size: usize,
26
27 /// Trace sampling rate (0.0 to 1.0)
28 pub trace_sampling_rate: f64,
29}
30
31impl Default for ProcessorConfig {
32 fn default() -> Self {
33 Self {
34 enable_signature_validation: true,
35 enable_session_correlation: true,
36 session_id_strategy: SessionIdStrategy::Entity,
37 max_payload_size: 1024 * 1024, // 1MB
38 trace_sampling_rate: 0.1,
39 }
40 }
41}
42
43/// Strategy for generating session IDs for ordered processing.
44#[derive(Debug, Clone)]
45pub enum SessionIdStrategy {
46 /// No session IDs generated
47 None,
48
49 /// Entity-based session IDs (e.g., "pr-owner/repo-123")
50 Entity,
51
52 /// Repository-based session IDs (e.g., "repo-owner/name")
53 Repository,
54
55 /// Custom session ID generation function
56 Custom(fn(&EventEnvelope) -> Option<String>),
57}
58
59/// Processes raw GitHub webhooks into normalized event envelopes.
60///
61/// The event processor handles:
62/// - Signature validation (optional)
63/// - JSON parsing and validation
64/// - Entity extraction and classification
65/// - Session ID generation for ordering
66/// - Metadata population
67///
68/// # Examples
69///
70/// ```rust,no_run
71/// use github_bot_sdk::events::{EventProcessor, ProcessorConfig};
72///
73/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
74/// let config = ProcessorConfig::default();
75/// let processor = EventProcessor::new(config);
76///
77/// let envelope = processor.process_webhook(
78/// "pull_request",
79/// b"{\"action\":\"opened\",\"number\":1}",
80/// Some("delivery-123"),
81/// ).await?;
82///
83/// println!("Processed event: {}", envelope.event_id);
84/// # Ok(())
85/// # }
86/// ```
87pub struct EventProcessor {
88 config: ProcessorConfig,
89}
90
91impl EventProcessor {
92 /// Create a new event processor with the given configuration.
93 pub fn new(config: ProcessorConfig) -> Self {
94 Self { config }
95 }
96
97 /// Process a raw webhook into a normalized event envelope.
98 ///
99 /// # Arguments
100 ///
101 /// * `event_type` - GitHub event type (from X-GitHub-Event header)
102 /// * `payload` - Raw webhook payload bytes
103 /// * `delivery_id` - GitHub delivery ID (from X-GitHub-Delivery header)
104 ///
105 /// # Returns
106 ///
107 /// A normalized `EventEnvelope` or an error if processing fails.
108 ///
109 /// # Errors
110 ///
111 /// Returns `EventError` if:
112 /// - Payload exceeds maximum size
113 /// - Payload is not valid JSON
114 /// - Required fields are missing
115 /// - Event type is unsupported
116 pub async fn process_webhook(
117 &self,
118 event_type: &str,
119 payload: &[u8],
120 delivery_id: Option<&str>,
121 ) -> Result<EventEnvelope, EventError> {
122 // Check payload size
123 if payload.len() > self.config.max_payload_size {
124 return Err(EventError::PayloadTooLarge {
125 size: payload.len(),
126 max: self.config.max_payload_size,
127 });
128 }
129
130 // Parse JSON payload
131 let json_payload: Value = serde_json::from_slice(payload)?;
132
133 // Extract repository information
134 let repository =
135 json_payload
136 .get("repository")
137 .ok_or_else(|| EventError::MissingField {
138 field: "repository".to_string(),
139 })?;
140
141 let repository: Repository = serde_json::from_value(repository.clone())?;
142
143 // Extract entity information
144 let (entity_type, entity_id) = self.extract_entity_info(event_type, &json_payload)?;
145
146 // Create event payload wrapper
147 let event_payload = EventPayload::new(json_payload);
148
149 // Create event ID
150 let event_id = if let Some(delivery_id) = delivery_id {
151 EventId::from_github_delivery(delivery_id)
152 } else {
153 EventId::new()
154 };
155
156 // Create metadata
157 let metadata = EventMetadata {
158 delivery_id: delivery_id.map(|s| s.to_string()),
159 signature_valid: !self.config.enable_signature_validation, // Default if not validated
160 ..Default::default()
161 };
162
163 // Create envelope
164 let mut envelope = EventEnvelope {
165 event_id,
166 event_type: event_type.to_string(),
167 repository,
168 entity_type,
169 entity_id: entity_id.clone(),
170 session_id: None,
171 payload: event_payload,
172 metadata,
173 trace_context: None,
174 };
175
176 // Generate session ID if enabled
177 if self.config.enable_session_correlation {
178 let session_id =
179 self.generate_session_id(&envelope.entity_type, &entity_id, &envelope.repository);
180 envelope.session_id = session_id;
181 }
182
183 Ok(envelope)
184 }
185
186 /// Extract entity information from the payload.
187 ///
188 /// Determines the primary entity type and ID for session correlation.
189 pub fn extract_entity_info(
190 &self,
191 event_type: &str,
192 payload: &Value,
193 ) -> Result<(EntityType, Option<String>), EventError> {
194 let entity_type = EntityType::from_event_type(event_type);
195
196 // Extract entity ID based on event type
197 let entity_id = match event_type {
198 "pull_request" | "pull_request_review" | "pull_request_review_comment" => {
199 // For PR events, extract PR number
200 payload
201 .get("number")
202 .or_else(|| payload.get("pull_request").and_then(|pr| pr.get("number")))
203 .and_then(|v| v.as_u64())
204 .map(|n| n.to_string())
205 }
206 "issues" | "issue_comment" => {
207 // For issue events, extract issue number
208 payload
209 .get("issue")
210 .and_then(|issue| issue.get("number"))
211 .and_then(|v| v.as_u64())
212 .map(|n| n.to_string())
213 }
214 "push" | "create" | "delete" => {
215 // For branch events, extract ref name
216 payload
217 .get("ref")
218 .and_then(|v| v.as_str())
219 .map(|r| r.to_string())
220 }
221 "check_run" => {
222 // For check run events, extract check run ID
223 payload
224 .get("check_run")
225 .and_then(|cr| cr.get("id"))
226 .and_then(|v| v.as_u64())
227 .map(|n| n.to_string())
228 }
229 "check_suite" => {
230 // For check suite events, extract check suite ID
231 payload
232 .get("check_suite")
233 .and_then(|cs| cs.get("id"))
234 .and_then(|v| v.as_u64())
235 .map(|n| n.to_string())
236 }
237 "release" => {
238 // For release events, extract release ID
239 payload
240 .get("release")
241 .and_then(|r| r.get("id"))
242 .and_then(|v| v.as_u64())
243 .map(|n| n.to_string())
244 }
245 "deployment" | "deployment_status" => {
246 // For deployment events, extract deployment ID
247 payload
248 .get("deployment")
249 .and_then(|d| d.get("id"))
250 .and_then(|v| v.as_u64())
251 .map(|n| n.to_string())
252 }
253 _ => {
254 // For other events, no specific entity ID
255 None
256 }
257 };
258
259 Ok((entity_type, entity_id))
260 }
261
262 /// Generate a session ID for ordered processing.
263 ///
264 /// Uses the configured strategy to create session IDs that group
265 /// related events together for sequential processing.
266 pub fn generate_session_id(
267 &self,
268 entity_type: &EntityType,
269 entity_id: &Option<String>,
270 repository: &Repository,
271 ) -> Option<String> {
272 match &self.config.session_id_strategy {
273 SessionIdStrategy::None => None,
274 SessionIdStrategy::Entity => {
275 // Generate entity-based session ID
276 if let Some(id) = entity_id {
277 match entity_type {
278 EntityType::PullRequest => {
279 Some(format!("pr-{}-{}", repository.full_name, id))
280 }
281 EntityType::Issue => Some(format!("issue-{}-{}", repository.full_name, id)),
282 EntityType::Branch => {
283 Some(format!("branch-{}-{}", repository.full_name, id))
284 }
285 EntityType::CheckRun => {
286 Some(format!("check-run-{}-{}", repository.full_name, id))
287 }
288 EntityType::CheckSuite => {
289 Some(format!("check-suite-{}-{}", repository.full_name, id))
290 }
291 EntityType::Release => {
292 Some(format!("release-{}-{}", repository.full_name, id))
293 }
294 EntityType::Deployment => {
295 Some(format!("deployment-{}-{}", repository.full_name, id))
296 }
297 _ => None,
298 }
299 } else {
300 None
301 }
302 }
303 SessionIdStrategy::Repository => {
304 // Generate repository-based session ID
305 Some(format!("repo-{}", repository.full_name))
306 }
307 SessionIdStrategy::Custom(f) => {
308 // Use custom function - create temporary envelope for evaluation
309 let temp_envelope = EventEnvelope {
310 event_id: EventId::new(),
311 event_type: String::new(),
312 repository: repository.clone(),
313 entity_type: entity_type.clone(),
314 entity_id: entity_id.clone(),
315 session_id: None,
316 payload: EventPayload::new(serde_json::Value::Null),
317 metadata: EventMetadata::default(),
318 trace_context: None,
319 };
320 f(&temp_envelope)
321 }
322 }
323 }
324}
325
326#[cfg(test)]
327#[path = "processor_tests.rs"]
328mod tests;