1use axum::{extract::State, http::HeaderMap, response::Json};
6
7use super::state::MultiUserMemoryManager;
8use crate::errors::{AppError, ValidationErrorExt};
9use crate::integrations;
10use crate::memory::{self, Experience, ExperienceType};
11use crate::validation;
12use std::sync::Arc;
13
14type AppState = Arc<MultiUserMemoryManager>;
15
16#[tracing::instrument(skip(state, body, headers))]
18pub async fn linear_webhook(
19 State(state): State<AppState>,
20 headers: HeaderMap,
21 body: axum::body::Bytes,
22) -> Result<Json<serde_json::Value>, AppError> {
23 use integrations::linear::LinearWebhook;
24
25 let signing_secret = std::env::var("LINEAR_WEBHOOK_SECRET").ok();
26 let has_secret = signing_secret.is_some();
27 let webhook = LinearWebhook::new(signing_secret);
28
29 let signature = headers
31 .get("linear-signature")
32 .and_then(|h| h.to_str().ok());
33
34 match (has_secret, signature) {
35 (true, Some(sig)) => {
36 if !webhook
37 .verify_signature(&body, sig)
38 .map_err(AppError::Internal)?
39 {
40 return Err(AppError::InvalidInput {
41 field: "signature".to_string(),
42 reason: "Invalid webhook signature".to_string(),
43 });
44 }
45 }
46 (true, None) => {
47 return Err(AppError::InvalidInput {
48 field: "linear-signature".to_string(),
49 reason: "Missing required webhook signature header".to_string(),
50 });
51 }
52 (false, _) => {
53 tracing::warn!("No LINEAR_WEBHOOK_SECRET configured, skipping signature verification");
54 }
55 }
56
57 let payload = webhook.parse_payload(&body).map_err(AppError::Internal)?;
58
59 if payload.entity_type != "Issue" {
60 return Ok(Json(serde_json::json!({
61 "status": "ignored",
62 "reason": "Only Issue events are processed"
63 })));
64 }
65
66 if payload.action == "remove" {
67 return Ok(Json(serde_json::json!({
68 "status": "acknowledged",
69 "action": "remove"
70 })));
71 }
72
73 let external_id = match &payload.data.identifier {
74 Some(id) => format!("linear:{}", id),
75 None => format!("linear:{}", payload.data.id),
76 };
77
78 let content = LinearWebhook::issue_to_content(&payload.data);
79 let tags = LinearWebhook::issue_to_tags(&payload.data);
80 let change_type = LinearWebhook::determine_change_type(&payload.action, &payload.data);
81
82 let user_id =
83 std::env::var("LINEAR_SYNC_USER_ID").unwrap_or_else(|_| "linear-sync".to_string());
84
85 let experience = Experience {
86 content: content.clone(),
87 experience_type: ExperienceType::Task,
88 entities: tags.clone(),
89 ..Default::default()
90 };
91
92 let change_type_enum = match change_type.as_str() {
93 "created" => memory::types::ChangeType::Created,
94 "status_changed" => memory::types::ChangeType::StatusChanged,
95 "tags_updated" => memory::types::ChangeType::TagsUpdated,
96 _ => memory::types::ChangeType::ContentUpdated,
97 };
98
99 let memory_system = state
100 .get_user_memory(&user_id)
101 .map_err(AppError::Internal)?;
102
103 let (memory_id, was_update) = {
104 let memory = memory_system.clone();
105 let ext_id = external_id.clone();
106 let exp = experience.clone();
107 let ct = change_type_enum;
108 let actor_name = payload
109 .actor
110 .as_ref()
111 .and_then(|a| a.name.clone())
112 .unwrap_or_else(|| "linear-webhook".to_string());
113
114 tokio::task::spawn_blocking(move || {
115 let memory_guard = memory.read();
116 memory_guard.upsert(ext_id, exp, ct, Some(actor_name), None)
117 })
118 .await
119 .map_err(|e| AppError::Internal(anyhow::anyhow!("Blocking task panicked: {e}")))?
120 .map_err(AppError::Internal)?
121 };
122
123 Ok(Json(serde_json::json!({
124 "status": "success",
125 "id": memory_id.0.to_string(),
126 "external_id": external_id,
127 "was_update": was_update,
128 "action": payload.action
129 })))
130}
131
132#[tracing::instrument(skip(state, req), fields(user_id = %req.user_id))]
134pub async fn linear_sync(
135 State(state): State<AppState>,
136 Json(req): Json<integrations::linear::LinearSyncRequest>,
137) -> Result<Json<integrations::linear::LinearSyncResponse>, AppError> {
138 use integrations::linear::{LinearClient, LinearSyncResponse, LinearWebhook};
139
140 validation::validate_user_id(&req.user_id).map_validation_err("user_id")?;
141
142 if req.api_key.is_empty() {
143 return Err(AppError::InvalidInput {
144 field: "api_key".to_string(),
145 reason: "Linear API key is required".to_string(),
146 });
147 }
148
149 let client = LinearClient::new(req.api_key.clone());
150
151 let issues = client
152 .fetch_issues(
153 req.team_id.as_deref(),
154 req.updated_after.as_deref(),
155 req.limit,
156 )
157 .await
158 .map_err(|e| AppError::Internal(anyhow::anyhow!("Failed to fetch Linear issues: {}", e)))?;
159
160 let total = issues.len();
161 let mut created_count = 0;
162 let mut updated_count = 0;
163 let mut error_count = 0;
164 let mut errors = Vec::new();
165
166 let memory_system = state
167 .get_user_memory(&req.user_id)
168 .map_err(AppError::Internal)?;
169
170 for issue in issues {
171 let external_id = match &issue.identifier {
172 Some(id) => format!("linear:{}", id),
173 None => format!("linear:{}", issue.id),
174 };
175
176 let content = LinearWebhook::issue_to_content(&issue);
177 let tags = LinearWebhook::issue_to_tags(&issue);
178
179 let experience = Experience {
180 content,
181 experience_type: ExperienceType::Task,
182 entities: tags,
183 ..Default::default()
184 };
185
186 let result = {
187 let memory = memory_system.clone();
188 let ext_id = external_id.clone();
189 let exp = experience;
190
191 tokio::task::spawn_blocking(move || {
192 let memory_guard = memory.read();
193 memory_guard.upsert(
194 ext_id,
195 exp,
196 memory::types::ChangeType::ContentUpdated,
197 Some("linear-bulk-sync".to_string()),
198 None,
199 )
200 })
201 .await
202 };
203
204 match result {
205 Ok(Ok((_, was_update))) => {
206 if was_update {
207 updated_count += 1;
208 } else {
209 created_count += 1;
210 }
211 }
212 Ok(Err(e)) => {
213 error_count += 1;
214 errors.push(format!("{}: {}", external_id, e));
215 }
216 Err(e) => {
217 error_count += 1;
218 errors.push(format!("{}: Task panicked: {}", external_id, e));
219 }
220 }
221 }
222
223 Ok(Json(LinearSyncResponse {
224 synced_count: total,
225 created_count,
226 updated_count,
227 error_count,
228 errors,
229 }))
230}
231
232#[tracing::instrument(skip(state, body, headers))]
234pub async fn github_webhook(
235 State(state): State<AppState>,
236 headers: HeaderMap,
237 body: axum::body::Bytes,
238) -> Result<Json<serde_json::Value>, AppError> {
239 use integrations::github::GitHubWebhook;
240
241 let webhook_secret = std::env::var("GITHUB_WEBHOOK_SECRET").ok();
242 let has_secret = webhook_secret.is_some();
243 let webhook = GitHubWebhook::new(webhook_secret);
244
245 let signature = headers
247 .get("x-hub-signature-256")
248 .and_then(|h| h.to_str().ok());
249
250 match (has_secret, signature) {
251 (true, Some(sig)) => {
252 if !webhook
253 .verify_signature(&body, sig)
254 .map_err(AppError::Internal)?
255 {
256 return Err(AppError::InvalidInput {
257 field: "signature".to_string(),
258 reason: "Invalid webhook signature".to_string(),
259 });
260 }
261 }
262 (true, None) => {
263 return Err(AppError::InvalidInput {
264 field: "x-hub-signature-256".to_string(),
265 reason: "Missing required webhook signature header".to_string(),
266 });
267 }
268 (false, _) => {
269 tracing::warn!("No GITHUB_WEBHOOK_SECRET configured, skipping signature verification");
270 }
271 }
272
273 let event_type = headers
274 .get("x-github-event")
275 .and_then(|h| h.to_str().ok())
276 .unwrap_or("unknown");
277
278 if event_type != "issues" && event_type != "pull_request" {
279 return Ok(Json(serde_json::json!({
280 "status": "ignored",
281 "reason": format!("Only issues and pull_request events are processed, got: {}", event_type)
282 })));
283 }
284
285 let payload = webhook.parse_payload(&body).map_err(AppError::Internal)?;
286
287 let user_id =
288 std::env::var("GITHUB_SYNC_USER_ID").unwrap_or_else(|_| "github-sync".to_string());
289
290 let (external_id, content, tags, change_type) = if let Some(issue) = &payload.issue {
291 let ext_id = GitHubWebhook::issue_external_id(&payload.repository, issue.number);
292 let content = GitHubWebhook::issue_to_content(issue, &payload.repository);
293 let tags = GitHubWebhook::issue_to_tags(issue, &payload.repository);
294 let ct = GitHubWebhook::determine_change_type(&payload.action, false);
295 (ext_id, content, tags, ct)
296 } else if let Some(pr) = &payload.pull_request {
297 let ext_id = GitHubWebhook::pr_external_id(&payload.repository, pr.number);
298 let content = GitHubWebhook::pr_to_content(pr, &payload.repository);
299 let tags = GitHubWebhook::pr_to_tags(pr, &payload.repository);
300 let ct = GitHubWebhook::determine_change_type(&payload.action, true);
301 (ext_id, content, tags, ct)
302 } else {
303 return Ok(Json(serde_json::json!({
304 "status": "ignored",
305 "reason": "No issue or pull_request data in payload"
306 })));
307 };
308
309 let experience = Experience {
310 content: content.clone(),
311 experience_type: ExperienceType::Task,
312 entities: tags.clone(),
313 ..Default::default()
314 };
315
316 let change_type_enum = match change_type.as_str() {
317 "created" => memory::types::ChangeType::Created,
318 "status_changed" => memory::types::ChangeType::StatusChanged,
319 "tags_updated" => memory::types::ChangeType::TagsUpdated,
320 _ => memory::types::ChangeType::ContentUpdated,
321 };
322
323 let memory_system = state
324 .get_user_memory(&user_id)
325 .map_err(AppError::Internal)?;
326
327 let (memory_id, was_update) = {
328 let memory = memory_system.clone();
329 let ext_id = external_id.clone();
330 let exp = experience.clone();
331 let ct = change_type_enum;
332 let actor_name = payload
333 .sender
334 .as_ref()
335 .map(|s| s.login.clone())
336 .unwrap_or_else(|| "github-webhook".to_string());
337
338 tokio::task::spawn_blocking(move || {
339 let memory_guard = memory.read();
340 memory_guard.upsert(ext_id, exp, ct, Some(actor_name), None)
341 })
342 .await
343 .map_err(|e| AppError::Internal(anyhow::anyhow!("Blocking task panicked: {e}")))?
344 .map_err(AppError::Internal)?
345 };
346
347 Ok(Json(serde_json::json!({
348 "status": "success",
349 "id": memory_id.0.to_string(),
350 "external_id": external_id,
351 "was_update": was_update,
352 "action": payload.action,
353 "event_type": event_type
354 })))
355}
356
357#[tracing::instrument(skip(state, req), fields(user_id = %req.user_id))]
359pub async fn github_sync(
360 State(state): State<AppState>,
361 Json(req): Json<integrations::github::GitHubSyncRequest>,
362) -> Result<Json<integrations::github::GitHubSyncResponse>, AppError> {
363 use integrations::github::{GitHubClient, GitHubSyncResponse, GitHubWebhook};
364
365 validation::validate_user_id(&req.user_id).map_validation_err("user_id")?;
366
367 if req.token.is_empty() {
368 return Err(AppError::InvalidInput {
369 field: "token".to_string(),
370 reason: "GitHub token is required".to_string(),
371 });
372 }
373
374 let client = GitHubClient::new(req.token.clone());
375
376 let repo_info = client
377 .get_repository(&req.owner, &req.repo)
378 .await
379 .map_err(|e| AppError::Internal(anyhow::anyhow!("Failed to get repository: {}", e)))?;
380
381 let mut issues_synced = 0;
382 let mut prs_synced = 0;
383 let mut created_count = 0;
384 let mut updated_count = 0;
385 let mut error_count = 0;
386 let mut errors = Vec::new();
387
388 let memory_system = state
389 .get_user_memory(&req.user_id)
390 .map_err(AppError::Internal)?;
391
392 if req.sync_issues {
394 let issues = client
395 .fetch_issues(&req.owner, &req.repo, &req.state, req.limit)
396 .await
397 .map_err(|e| AppError::Internal(anyhow::anyhow!("Failed to fetch issues: {}", e)))?;
398
399 for issue in issues {
400 let external_id = GitHubWebhook::issue_external_id(&repo_info, issue.number);
401 let content = GitHubWebhook::issue_to_content(&issue, &repo_info);
402 let tags = GitHubWebhook::issue_to_tags(&issue, &repo_info);
403
404 let experience = Experience {
405 content,
406 experience_type: ExperienceType::Task,
407 entities: tags,
408 ..Default::default()
409 };
410
411 let result = {
412 let memory = memory_system.clone();
413 let ext_id = external_id.clone();
414 let exp = experience;
415
416 tokio::task::spawn_blocking(move || {
417 let memory_guard = memory.read();
418 memory_guard.upsert(
419 ext_id,
420 exp,
421 memory::types::ChangeType::ContentUpdated,
422 Some("github-bulk-sync".to_string()),
423 None,
424 )
425 })
426 .await
427 };
428
429 match result {
430 Ok(Ok((_, was_update))) => {
431 issues_synced += 1;
432 if was_update {
433 updated_count += 1;
434 } else {
435 created_count += 1;
436 }
437 }
438 Ok(Err(e)) => {
439 error_count += 1;
440 errors.push(format!("{}: {}", external_id, e));
441 }
442 Err(e) => {
443 error_count += 1;
444 errors.push(format!("{}: {}", external_id, e));
445 }
446 }
447 }
448 }
449
450 if req.sync_prs {
452 let prs = client
453 .fetch_pull_requests(&req.owner, &req.repo, &req.state, req.limit)
454 .await
455 .map_err(|e| AppError::Internal(anyhow::anyhow!("Failed to fetch PRs: {}", e)))?;
456
457 for pr in prs {
458 let external_id = GitHubWebhook::pr_external_id(&repo_info, pr.number);
459 let content = GitHubWebhook::pr_to_content(&pr, &repo_info);
460 let tags = GitHubWebhook::pr_to_tags(&pr, &repo_info);
461
462 let experience = Experience {
463 content,
464 experience_type: ExperienceType::Task,
465 entities: tags,
466 ..Default::default()
467 };
468
469 let result = {
470 let memory = memory_system.clone();
471 let ext_id = external_id.clone();
472 let exp = experience;
473
474 tokio::task::spawn_blocking(move || {
475 let memory_guard = memory.read();
476 memory_guard.upsert(
477 ext_id,
478 exp,
479 memory::types::ChangeType::ContentUpdated,
480 Some("github-bulk-sync".to_string()),
481 None,
482 )
483 })
484 .await
485 };
486
487 match result {
488 Ok(Ok((_, was_update))) => {
489 prs_synced += 1;
490 if was_update {
491 updated_count += 1;
492 } else {
493 created_count += 1;
494 }
495 }
496 Ok(Err(e)) => {
497 error_count += 1;
498 errors.push(format!("{}: {}", external_id, e));
499 }
500 Err(e) => {
501 error_count += 1;
502 errors.push(format!("{}: {}", external_id, e));
503 }
504 }
505 }
506 }
507
508 Ok(Json(GitHubSyncResponse {
509 synced_count: issues_synced + prs_synced,
510 issues_synced,
511 prs_synced,
512 commits_synced: 0,
513 created_count,
514 updated_count,
515 error_count,
516 errors,
517 }))
518}