Skip to main content

shodh_memory/handlers/
integrations.rs

1//! External Integration Handlers
2//!
3//! Handlers for Linear and GitHub webhook integrations and bulk sync.
4
5use axum::{extract::State, http::HeaderMap, response::Json};
6
7use super::state::MultiUserMemoryManager;
8use crate::errors::{AppError, ValidationErrorExt};
9use crate::integrations;
10use crate::memory::{self, Experience, ExperienceType};
11use crate::validation;
12use std::sync::Arc;
13
14type AppState = Arc<MultiUserMemoryManager>;
15
16/// POST /webhook/linear - Linear webhook receiver
17#[tracing::instrument(skip(state, body, headers))]
18pub async fn linear_webhook(
19    State(state): State<AppState>,
20    headers: HeaderMap,
21    body: axum::body::Bytes,
22) -> Result<Json<serde_json::Value>, AppError> {
23    use integrations::linear::LinearWebhook;
24
25    let signing_secret = std::env::var("LINEAR_WEBHOOK_SECRET").ok();
26    let has_secret = signing_secret.is_some();
27    let webhook = LinearWebhook::new(signing_secret);
28
29    // Verify webhook signature
30    let signature = headers
31        .get("linear-signature")
32        .and_then(|h| h.to_str().ok());
33
34    match (has_secret, signature) {
35        (true, Some(sig)) => {
36            if !webhook
37                .verify_signature(&body, sig)
38                .map_err(AppError::Internal)?
39            {
40                return Err(AppError::InvalidInput {
41                    field: "signature".to_string(),
42                    reason: "Invalid webhook signature".to_string(),
43                });
44            }
45        }
46        (true, None) => {
47            return Err(AppError::InvalidInput {
48                field: "linear-signature".to_string(),
49                reason: "Missing required webhook signature header".to_string(),
50            });
51        }
52        (false, _) => {
53            tracing::warn!("No LINEAR_WEBHOOK_SECRET configured, skipping signature verification");
54        }
55    }
56
57    let payload = webhook.parse_payload(&body).map_err(AppError::Internal)?;
58
59    if payload.entity_type != "Issue" {
60        return Ok(Json(serde_json::json!({
61            "status": "ignored",
62            "reason": "Only Issue events are processed"
63        })));
64    }
65
66    if payload.action == "remove" {
67        return Ok(Json(serde_json::json!({
68            "status": "acknowledged",
69            "action": "remove"
70        })));
71    }
72
73    let external_id = match &payload.data.identifier {
74        Some(id) => format!("linear:{}", id),
75        None => format!("linear:{}", payload.data.id),
76    };
77
78    let content = LinearWebhook::issue_to_content(&payload.data);
79    let tags = LinearWebhook::issue_to_tags(&payload.data);
80    let change_type = LinearWebhook::determine_change_type(&payload.action, &payload.data);
81
82    let user_id =
83        std::env::var("LINEAR_SYNC_USER_ID").unwrap_or_else(|_| "linear-sync".to_string());
84
85    let experience = Experience {
86        content: content.clone(),
87        experience_type: ExperienceType::Task,
88        entities: tags.clone(),
89        ..Default::default()
90    };
91
92    let change_type_enum = match change_type.as_str() {
93        "created" => memory::types::ChangeType::Created,
94        "status_changed" => memory::types::ChangeType::StatusChanged,
95        "tags_updated" => memory::types::ChangeType::TagsUpdated,
96        _ => memory::types::ChangeType::ContentUpdated,
97    };
98
99    let memory_system = state
100        .get_user_memory(&user_id)
101        .map_err(AppError::Internal)?;
102
103    let (memory_id, was_update) = {
104        let memory = memory_system.clone();
105        let ext_id = external_id.clone();
106        let exp = experience.clone();
107        let ct = change_type_enum;
108        let actor_name = payload
109            .actor
110            .as_ref()
111            .and_then(|a| a.name.clone())
112            .unwrap_or_else(|| "linear-webhook".to_string());
113
114        tokio::task::spawn_blocking(move || {
115            let memory_guard = memory.read();
116            memory_guard.upsert(ext_id, exp, ct, Some(actor_name), None)
117        })
118        .await
119        .map_err(|e| AppError::Internal(anyhow::anyhow!("Blocking task panicked: {e}")))?
120        .map_err(AppError::Internal)?
121    };
122
123    Ok(Json(serde_json::json!({
124        "status": "success",
125        "id": memory_id.0.to_string(),
126        "external_id": external_id,
127        "was_update": was_update,
128        "action": payload.action
129    })))
130}
131
132/// POST /api/sync/linear - Bulk sync Linear issues
133#[tracing::instrument(skip(state, req), fields(user_id = %req.user_id))]
134pub async fn linear_sync(
135    State(state): State<AppState>,
136    Json(req): Json<integrations::linear::LinearSyncRequest>,
137) -> Result<Json<integrations::linear::LinearSyncResponse>, AppError> {
138    use integrations::linear::{LinearClient, LinearSyncResponse, LinearWebhook};
139
140    validation::validate_user_id(&req.user_id).map_validation_err("user_id")?;
141
142    if req.api_key.is_empty() {
143        return Err(AppError::InvalidInput {
144            field: "api_key".to_string(),
145            reason: "Linear API key is required".to_string(),
146        });
147    }
148
149    let client = LinearClient::new(req.api_key.clone());
150
151    let issues = client
152        .fetch_issues(
153            req.team_id.as_deref(),
154            req.updated_after.as_deref(),
155            req.limit,
156        )
157        .await
158        .map_err(|e| AppError::Internal(anyhow::anyhow!("Failed to fetch Linear issues: {}", e)))?;
159
160    let total = issues.len();
161    let mut created_count = 0;
162    let mut updated_count = 0;
163    let mut error_count = 0;
164    let mut errors = Vec::new();
165
166    let memory_system = state
167        .get_user_memory(&req.user_id)
168        .map_err(AppError::Internal)?;
169
170    for issue in issues {
171        let external_id = match &issue.identifier {
172            Some(id) => format!("linear:{}", id),
173            None => format!("linear:{}", issue.id),
174        };
175
176        let content = LinearWebhook::issue_to_content(&issue);
177        let tags = LinearWebhook::issue_to_tags(&issue);
178
179        let experience = Experience {
180            content,
181            experience_type: ExperienceType::Task,
182            entities: tags,
183            ..Default::default()
184        };
185
186        let result = {
187            let memory = memory_system.clone();
188            let ext_id = external_id.clone();
189            let exp = experience;
190
191            tokio::task::spawn_blocking(move || {
192                let memory_guard = memory.read();
193                memory_guard.upsert(
194                    ext_id,
195                    exp,
196                    memory::types::ChangeType::ContentUpdated,
197                    Some("linear-bulk-sync".to_string()),
198                    None,
199                )
200            })
201            .await
202        };
203
204        match result {
205            Ok(Ok((_, was_update))) => {
206                if was_update {
207                    updated_count += 1;
208                } else {
209                    created_count += 1;
210                }
211            }
212            Ok(Err(e)) => {
213                error_count += 1;
214                errors.push(format!("{}: {}", external_id, e));
215            }
216            Err(e) => {
217                error_count += 1;
218                errors.push(format!("{}: Task panicked: {}", external_id, e));
219            }
220        }
221    }
222
223    Ok(Json(LinearSyncResponse {
224        synced_count: total,
225        created_count,
226        updated_count,
227        error_count,
228        errors,
229    }))
230}
231
232/// POST /webhook/github - GitHub webhook receiver
233#[tracing::instrument(skip(state, body, headers))]
234pub async fn github_webhook(
235    State(state): State<AppState>,
236    headers: HeaderMap,
237    body: axum::body::Bytes,
238) -> Result<Json<serde_json::Value>, AppError> {
239    use integrations::github::GitHubWebhook;
240
241    let webhook_secret = std::env::var("GITHUB_WEBHOOK_SECRET").ok();
242    let has_secret = webhook_secret.is_some();
243    let webhook = GitHubWebhook::new(webhook_secret);
244
245    // Verify webhook signature
246    let signature = headers
247        .get("x-hub-signature-256")
248        .and_then(|h| h.to_str().ok());
249
250    match (has_secret, signature) {
251        (true, Some(sig)) => {
252            if !webhook
253                .verify_signature(&body, sig)
254                .map_err(AppError::Internal)?
255            {
256                return Err(AppError::InvalidInput {
257                    field: "signature".to_string(),
258                    reason: "Invalid webhook signature".to_string(),
259                });
260            }
261        }
262        (true, None) => {
263            return Err(AppError::InvalidInput {
264                field: "x-hub-signature-256".to_string(),
265                reason: "Missing required webhook signature header".to_string(),
266            });
267        }
268        (false, _) => {
269            tracing::warn!("No GITHUB_WEBHOOK_SECRET configured, skipping signature verification");
270        }
271    }
272
273    let event_type = headers
274        .get("x-github-event")
275        .and_then(|h| h.to_str().ok())
276        .unwrap_or("unknown");
277
278    if event_type != "issues" && event_type != "pull_request" {
279        return Ok(Json(serde_json::json!({
280            "status": "ignored",
281            "reason": format!("Only issues and pull_request events are processed, got: {}", event_type)
282        })));
283    }
284
285    let payload = webhook.parse_payload(&body).map_err(AppError::Internal)?;
286
287    let user_id =
288        std::env::var("GITHUB_SYNC_USER_ID").unwrap_or_else(|_| "github-sync".to_string());
289
290    let (external_id, content, tags, change_type) = if let Some(issue) = &payload.issue {
291        let ext_id = GitHubWebhook::issue_external_id(&payload.repository, issue.number);
292        let content = GitHubWebhook::issue_to_content(issue, &payload.repository);
293        let tags = GitHubWebhook::issue_to_tags(issue, &payload.repository);
294        let ct = GitHubWebhook::determine_change_type(&payload.action, false);
295        (ext_id, content, tags, ct)
296    } else if let Some(pr) = &payload.pull_request {
297        let ext_id = GitHubWebhook::pr_external_id(&payload.repository, pr.number);
298        let content = GitHubWebhook::pr_to_content(pr, &payload.repository);
299        let tags = GitHubWebhook::pr_to_tags(pr, &payload.repository);
300        let ct = GitHubWebhook::determine_change_type(&payload.action, true);
301        (ext_id, content, tags, ct)
302    } else {
303        return Ok(Json(serde_json::json!({
304            "status": "ignored",
305            "reason": "No issue or pull_request data in payload"
306        })));
307    };
308
309    let experience = Experience {
310        content: content.clone(),
311        experience_type: ExperienceType::Task,
312        entities: tags.clone(),
313        ..Default::default()
314    };
315
316    let change_type_enum = match change_type.as_str() {
317        "created" => memory::types::ChangeType::Created,
318        "status_changed" => memory::types::ChangeType::StatusChanged,
319        "tags_updated" => memory::types::ChangeType::TagsUpdated,
320        _ => memory::types::ChangeType::ContentUpdated,
321    };
322
323    let memory_system = state
324        .get_user_memory(&user_id)
325        .map_err(AppError::Internal)?;
326
327    let (memory_id, was_update) = {
328        let memory = memory_system.clone();
329        let ext_id = external_id.clone();
330        let exp = experience.clone();
331        let ct = change_type_enum;
332        let actor_name = payload
333            .sender
334            .as_ref()
335            .map(|s| s.login.clone())
336            .unwrap_or_else(|| "github-webhook".to_string());
337
338        tokio::task::spawn_blocking(move || {
339            let memory_guard = memory.read();
340            memory_guard.upsert(ext_id, exp, ct, Some(actor_name), None)
341        })
342        .await
343        .map_err(|e| AppError::Internal(anyhow::anyhow!("Blocking task panicked: {e}")))?
344        .map_err(AppError::Internal)?
345    };
346
347    Ok(Json(serde_json::json!({
348        "status": "success",
349        "id": memory_id.0.to_string(),
350        "external_id": external_id,
351        "was_update": was_update,
352        "action": payload.action,
353        "event_type": event_type
354    })))
355}
356
357/// POST /api/sync/github - Bulk sync GitHub issues and PRs
358#[tracing::instrument(skip(state, req), fields(user_id = %req.user_id))]
359pub async fn github_sync(
360    State(state): State<AppState>,
361    Json(req): Json<integrations::github::GitHubSyncRequest>,
362) -> Result<Json<integrations::github::GitHubSyncResponse>, AppError> {
363    use integrations::github::{GitHubClient, GitHubSyncResponse, GitHubWebhook};
364
365    validation::validate_user_id(&req.user_id).map_validation_err("user_id")?;
366
367    if req.token.is_empty() {
368        return Err(AppError::InvalidInput {
369            field: "token".to_string(),
370            reason: "GitHub token is required".to_string(),
371        });
372    }
373
374    let client = GitHubClient::new(req.token.clone());
375
376    let repo_info = client
377        .get_repository(&req.owner, &req.repo)
378        .await
379        .map_err(|e| AppError::Internal(anyhow::anyhow!("Failed to get repository: {}", e)))?;
380
381    let mut issues_synced = 0;
382    let mut prs_synced = 0;
383    let mut created_count = 0;
384    let mut updated_count = 0;
385    let mut error_count = 0;
386    let mut errors = Vec::new();
387
388    let memory_system = state
389        .get_user_memory(&req.user_id)
390        .map_err(AppError::Internal)?;
391
392    // Sync issues
393    if req.sync_issues {
394        let issues = client
395            .fetch_issues(&req.owner, &req.repo, &req.state, req.limit)
396            .await
397            .map_err(|e| AppError::Internal(anyhow::anyhow!("Failed to fetch issues: {}", e)))?;
398
399        for issue in issues {
400            let external_id = GitHubWebhook::issue_external_id(&repo_info, issue.number);
401            let content = GitHubWebhook::issue_to_content(&issue, &repo_info);
402            let tags = GitHubWebhook::issue_to_tags(&issue, &repo_info);
403
404            let experience = Experience {
405                content,
406                experience_type: ExperienceType::Task,
407                entities: tags,
408                ..Default::default()
409            };
410
411            let result = {
412                let memory = memory_system.clone();
413                let ext_id = external_id.clone();
414                let exp = experience;
415
416                tokio::task::spawn_blocking(move || {
417                    let memory_guard = memory.read();
418                    memory_guard.upsert(
419                        ext_id,
420                        exp,
421                        memory::types::ChangeType::ContentUpdated,
422                        Some("github-bulk-sync".to_string()),
423                        None,
424                    )
425                })
426                .await
427            };
428
429            match result {
430                Ok(Ok((_, was_update))) => {
431                    issues_synced += 1;
432                    if was_update {
433                        updated_count += 1;
434                    } else {
435                        created_count += 1;
436                    }
437                }
438                Ok(Err(e)) => {
439                    error_count += 1;
440                    errors.push(format!("{}: {}", external_id, e));
441                }
442                Err(e) => {
443                    error_count += 1;
444                    errors.push(format!("{}: {}", external_id, e));
445                }
446            }
447        }
448    }
449
450    // Sync PRs
451    if req.sync_prs {
452        let prs = client
453            .fetch_pull_requests(&req.owner, &req.repo, &req.state, req.limit)
454            .await
455            .map_err(|e| AppError::Internal(anyhow::anyhow!("Failed to fetch PRs: {}", e)))?;
456
457        for pr in prs {
458            let external_id = GitHubWebhook::pr_external_id(&repo_info, pr.number);
459            let content = GitHubWebhook::pr_to_content(&pr, &repo_info);
460            let tags = GitHubWebhook::pr_to_tags(&pr, &repo_info);
461
462            let experience = Experience {
463                content,
464                experience_type: ExperienceType::Task,
465                entities: tags,
466                ..Default::default()
467            };
468
469            let result = {
470                let memory = memory_system.clone();
471                let ext_id = external_id.clone();
472                let exp = experience;
473
474                tokio::task::spawn_blocking(move || {
475                    let memory_guard = memory.read();
476                    memory_guard.upsert(
477                        ext_id,
478                        exp,
479                        memory::types::ChangeType::ContentUpdated,
480                        Some("github-bulk-sync".to_string()),
481                        None,
482                    )
483                })
484                .await
485            };
486
487            match result {
488                Ok(Ok((_, was_update))) => {
489                    prs_synced += 1;
490                    if was_update {
491                        updated_count += 1;
492                    } else {
493                        created_count += 1;
494                    }
495                }
496                Ok(Err(e)) => {
497                    error_count += 1;
498                    errors.push(format!("{}: {}", external_id, e));
499                }
500                Err(e) => {
501                    error_count += 1;
502                    errors.push(format!("{}: {}", external_id, e));
503                }
504            }
505        }
506    }
507
508    Ok(Json(GitHubSyncResponse {
509        synced_count: issues_synced + prs_synced,
510        issues_synced,
511        prs_synced,
512        commits_synced: 0,
513        created_count,
514        updated_count,
515        error_count,
516        errors,
517    }))
518}