1use crate::AgentError;
8use crate::constants::env::system;
9use crate::utils::http::get_user_agent;
10use std::collections::HashMap;
11use std::path::PathBuf;
12use std::sync::Mutex;
13use std::sync::atomic::{AtomicBool, Ordering};
14
15#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
17pub struct TeamMemoryContent {
18 pub entries: HashMap<String, String>,
20 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
22 pub entry_checksums: HashMap<String, String>,
23}
24
25#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
27pub struct TeamMemoryData {
28 pub organization_id: String,
29 pub repo: String,
30 pub version: u32,
31 pub last_modified: String,
32 pub checksum: String,
33 pub content: TeamMemoryContent,
34}
35
36#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
38pub struct TeamMemoryTooManyEntries {
39 pub error: TeamMemoryTooManyEntriesError,
40}
41
42#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
43pub struct TeamMemoryTooManyEntriesError {
44 pub details: TeamMemoryTooManyEntriesDetails,
45}
46
47#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
48pub struct TeamMemoryTooManyEntriesDetails {
49 #[serde(rename = "error_code")]
50 pub error_code: String,
51 #[serde(rename = "max_entries")]
52 pub max_entries: u32,
53 #[serde(rename = "received_entries")]
54 pub received_entries: u32,
55}
56
57#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
59pub struct SkippedSecretFile {
60 pub path: String,
62 pub rule_id: String,
64 pub label: String,
66}
67
68#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
70pub struct TeamMemorySyncFetchResult {
71 pub success: bool,
72 pub data: Option<TeamMemoryData>,
73 #[serde(default, skip_serializing_if = "Option::is_none")]
75 pub is_empty: Option<bool>,
76 #[serde(default, skip_serializing_if = "Option::is_none")]
78 pub not_modified: Option<bool>,
79 #[serde(default, skip_serializing_if = "Option::is_none")]
81 pub checksum: Option<String>,
82 #[serde(default, skip_serializing_if = "Option::is_none")]
83 pub error: Option<String>,
84 #[serde(default, skip_serializing_if = "Option::is_none")]
85 pub skip_retry: Option<bool>,
86 #[serde(default, skip_serializing_if = "Option::is_none")]
87 pub error_type: Option<String>,
88 #[serde(default, skip_serializing_if = "Option::is_none")]
89 pub http_status: Option<u16>,
90}
91
92#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
94pub struct TeamMemoryHashesResult {
95 pub success: bool,
96 #[serde(default, skip_serializing_if = "Option::is_none")]
97 pub version: Option<u32>,
98 #[serde(default, skip_serializing_if = "Option::is_none")]
99 pub checksum: Option<String>,
100 #[serde(default, skip_serializing_if = "Option::is_none")]
101 pub entry_checksums: Option<HashMap<String, String>>,
102 #[serde(default, skip_serializing_if = "Option::is_none")]
103 pub error: Option<String>,
104 #[serde(default, skip_serializing_if = "Option::is_none")]
105 pub error_type: Option<String>,
106 #[serde(default, skip_serializing_if = "Option::is_none")]
107 pub http_status: Option<u16>,
108}
109
110#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
112pub struct TeamMemorySyncPushResult {
113 pub success: bool,
114 pub files_uploaded: u32,
115 #[serde(default, skip_serializing_if = "Option::is_none")]
116 pub checksum: Option<String>,
117 #[serde(default, skip_serializing_if = "Option::is_none")]
119 pub conflict: Option<bool>,
120 #[serde(default, skip_serializing_if = "Option::is_none")]
121 pub error: Option<String>,
122 #[serde(default, skip_serializing_if = "Vec::is_empty")]
124 pub skipped_secrets: Vec<SkippedSecretFile>,
125 #[serde(default, skip_serializing_if = "Option::is_none")]
126 pub error_type: Option<String>,
127 #[serde(default, skip_serializing_if = "Option::is_none")]
128 pub http_status: Option<u16>,
129}
130
131#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
133pub struct TeamMemorySyncUploadResult {
134 pub success: bool,
135 #[serde(default, skip_serializing_if = "Option::is_none")]
136 pub checksum: Option<String>,
137 #[serde(default, skip_serializing_if = "Option::is_none")]
138 pub last_modified: Option<String>,
139 #[serde(default, skip_serializing_if = "Option::is_none")]
141 pub conflict: Option<bool>,
142 #[serde(default, skip_serializing_if = "Option::is_none")]
143 pub error: Option<String>,
144 #[serde(default, skip_serializing_if = "Option::is_none")]
146 pub server_error_code: Option<String>,
147 #[serde(default, skip_serializing_if = "Option::is_none")]
149 pub server_max_entries: Option<u32>,
150 #[serde(default, skip_serializing_if = "Option::is_none")]
152 pub server_received_entries: Option<u32>,
153 #[serde(default, skip_serializing_if = "Option::is_none")]
154 pub error_type: Option<String>,
155 #[serde(default, skip_serializing_if = "Option::is_none")]
156 pub http_status: Option<u16>,
157}
158
159#[derive(Debug, Clone)]
163pub struct SyncState {
164 pub last_known_checksum: Option<String>,
166 pub server_checksums: HashMap<String, String>,
168 pub server_max_entries: Option<u32>,
170}
171
172impl SyncState {
173 pub fn new() -> Self {
174 Self {
175 last_known_checksum: None,
176 server_checksums: HashMap::new(),
177 server_max_entries: None,
178 }
179 }
180}
181
182impl Default for SyncState {
183 fn default() -> Self {
184 Self::new()
185 }
186}
187
188pub fn create_sync_state() -> SyncState {
190 SyncState::new()
191}
192
193pub fn hash_content(content: &str) -> String {
197 use sha2::{Digest, Sha256};
198 let mut hasher = Sha256::new();
199 hasher.update(content.as_bytes());
200 let result = hasher.finalize();
201 format!("sha256:{}", hex::encode(result))
202}
203
204pub const TEAM_MEMORY_SYNC_TIMEOUT_MS: u64 = 30_000;
208pub const MAX_FILE_SIZE_BYTES: usize = 250_000;
210pub const MAX_PUT_BODY_BYTES: usize = 200_000;
212pub const MAX_RETRIES: u32 = 3;
214pub const MAX_CONFLICT_RETRIES: u32 = 2;
216
217pub fn get_team_memory_dir() -> PathBuf {
221 let home = std::env::var(system::HOME)
222 .or_else(|_| std::env::var(system::USERPROFILE))
223 .unwrap_or_else(|_| "/tmp".to_string());
224 PathBuf::from(home)
225 .join(".open-agent-sdk")
226 .join("team_memory")
227}
228
229pub fn get_team_memory_path(key: &str) -> PathBuf {
231 if key.contains("..") || key.starts_with('/') {
233 return get_team_memory_dir().join("INVALID");
234 }
235 get_team_memory_dir().join(key)
236}
237
238pub fn validate_team_memory_key(key: &str) -> Result<(), String> {
240 if key.is_empty() {
241 return Err("Key cannot be empty".to_string());
242 }
243 if key.contains("..") {
244 return Err("Key cannot contain '..'".to_string());
245 }
246 if key.starts_with('/') {
247 return Err("Key cannot start with '/'".to_string());
248 }
249 Ok(())
250}
251
252pub async fn read_local_team_memory() -> Result<HashMap<String, String>, AgentError> {
254 let dir = get_team_memory_dir();
255
256 if !dir.exists() {
257 return Ok(HashMap::new());
258 }
259
260 let mut entries = HashMap::new();
261 let mut dirs_to_process: Vec<PathBuf> = vec![dir.clone()];
262
263 while let Some(current_dir) = dirs_to_process.pop() {
264 let mut read_dir = tokio::fs::read_dir(¤t_dir)
265 .await
266 .map_err(AgentError::Io)?;
267
268 while let Some(entry) = read_dir.next_entry().await.map_err(AgentError::Io)? {
269 let path = entry.path();
270 let relative = path
271 .strip_prefix(&dir)
272 .map_err(|_| AgentError::Internal("Failed to get relative path".to_string()))?
273 .to_string_lossy()
274 .to_string();
275
276 if path.is_dir() {
277 dirs_to_process.push(path);
278 } else if path.is_file() {
279 if relative.starts_with('.') {
281 continue;
282 }
283 let content = tokio::fs::read_to_string(&path)
284 .await
285 .map_err(AgentError::Io)?;
286 entries.insert(relative, content);
287 }
288 }
289 }
290
291 Ok(entries)
292}
293
294pub async fn write_local_team_memory(entries: &HashMap<String, String>) -> Result<(), AgentError> {
296 let dir = get_team_memory_dir();
297 tokio::fs::create_dir_all(&dir)
298 .await
299 .map_err(AgentError::Io)?;
300
301 for (key, content) in entries {
302 let path = get_team_memory_path(key);
303 if let Some(parent) = path.parent() {
304 tokio::fs::create_dir_all(parent)
305 .await
306 .map_err(AgentError::Io)?;
307 }
308 tokio::fs::write(&path, content)
309 .await
310 .map_err(AgentError::Io)?;
311 }
312
313 Ok(())
314}
315
316pub async fn delete_local_team_memory_entry(key: &str) -> Result<(), AgentError> {
318 let path = get_team_memory_path(key);
319 if path.exists() {
320 tokio::fs::remove_file(path).await.map_err(AgentError::Io)?;
321 }
322 Ok(())
323}
324
325pub fn compute_delta(
329 local_entries: &HashMap<String, String>,
330 server_checksums: &HashMap<String, String>,
331) -> HashMap<String, String> {
332 let mut delta = HashMap::new();
333
334 for (key, content) in local_entries {
335 let local_hash = hash_content(content);
336 let server_hash = server_checksums.get(key);
337
338 if server_hash.is_none() || server_hash != Some(&local_hash) {
340 delta.insert(key.clone(), content.clone());
341 }
342 }
343
344 delta
345}
346
347pub fn batch_delta_by_bytes(
349 delta: &HashMap<String, String>,
350 max_bytes: usize,
351) -> Vec<HashMap<String, String>> {
352 let mut batches: Vec<HashMap<String, String>> = Vec::new();
353 let mut current_batch: HashMap<String, String> = HashMap::new();
354 let mut current_bytes: usize = 0;
355
356 let mut keys: Vec<&String> = delta.keys().collect();
358 keys.sort();
359
360 for key in keys {
361 let content = delta.get(key).unwrap();
362 let entry_bytes = key.len() + content.len();
363
364 if entry_bytes > max_bytes {
366 if !current_batch.is_empty() {
368 batches.push(current_batch);
369 current_batch = HashMap::new();
370 current_bytes = 0;
371 }
372 let mut single = HashMap::new();
374 single.insert(key.clone(), content.clone());
375 batches.push(single);
376 continue;
377 }
378
379 if current_bytes + entry_bytes > max_bytes && !current_batch.is_empty() {
381 batches.push(current_batch);
382 current_batch = HashMap::new();
383 current_bytes = 0;
384 }
385
386 current_batch.insert(key.clone(), content.clone());
387 current_bytes += entry_bytes;
388 }
389
390 if !current_batch.is_empty() {
392 batches.push(current_batch);
393 }
394
395 batches
396}
397
398fn get_team_memory_api_base() -> String {
402 std::env::var("AI_API_BASE_URL")
403 .ok()
404 .filter(|u| !u.is_empty())
405 .unwrap_or_else(|| "https://api.anthropic.com".to_string())
406}
407
408fn get_team_memory_auth_token() -> Option<String> {
410 std::env::var("AI_CODE_OAUTH_TOKEN")
411 .ok()
412 .filter(|t| !t.is_empty())
413 .or_else(|| {
414 std::env::var("AI_OAUTH_TOKEN")
415 .ok()
416 .filter(|t| !t.is_empty())
417 })
418 .or_else(|| {
419 std::env::var("AI_AUTH_TOKEN")
420 .ok()
421 .filter(|t| !t.is_empty())
422 })
423}
424
425fn build_team_memory_headers(
427 etag: Option<&str>,
428 content_type: Option<&str>,
429) -> Result<reqwest::header::HeaderMap, String> {
430 let mut headers = reqwest::header::HeaderMap::new();
431 headers.insert(
432 "Content-Type",
433 reqwest::header::HeaderValue::from_static("application/json"),
434 );
435 headers.insert(
436 "anthropic-version",
437 reqwest::header::HeaderValue::from_static("2025-04-20"),
438 );
439
440 if let Some(token) = get_team_memory_auth_token() {
441 let auth_value = format!("Bearer {}", token);
442 headers.insert(
443 "Authorization",
444 reqwest::header::HeaderValue::from_str(&auth_value)
445 .map_err(|e| format!("Invalid auth header: {}", e))?,
446 );
447 }
448
449 if let Some(etag_value) = etag {
450 headers.insert(
451 "If-None-Match",
452 reqwest::header::HeaderValue::from_str(etag_value)
453 .map_err(|e| format!("Invalid ETag header: {}", e))?,
454 );
455 }
456
457 if let Some(ct) = content_type {
458 headers.insert(
459 "Content-Type",
460 reqwest::header::HeaderValue::from_str(ct)
461 .map_err(|e| format!("Invalid Content-Type header: {}", e))?,
462 );
463 }
464
465 headers.insert(
466 "User-Agent",
467 reqwest::header::HeaderValue::from_str(&get_user_agent())
468 .map_err(|e| format!("Invalid User-Agent header: {}", e))?,
469 );
470
471 Ok(headers)
472}
473
474fn build_team_memory_url(repo_slug: &str, view: Option<&str>) -> String {
476 let base = get_team_memory_api_base();
477 let mut url = format!("{}/api/claude_code/team_memory", base);
478 let mut query_params: Vec<(String, String)> = vec![("repo".to_string(), repo_slug.to_string())];
479
480 if let Some(v) = view {
481 query_params.push(("view".to_string(), v.to_string()));
482 }
483
484 if !query_params.is_empty() {
485 url.push('?');
486 url.push_str(
487 &query_params
488 .iter()
489 .map(|(k, v)| format!("{}={}", urlencoding::encode(k), urlencoding::encode(v)))
490 .collect::<Vec<_>>()
491 .join("&"),
492 );
493 }
494
495 url
496}
497
498pub fn is_team_memory_sync_available() -> bool {
500 get_team_memory_auth_token().is_some()
501}
502
503pub async fn pull_team_memory(
505 state: &mut SyncState,
506 repo_slug: &str,
507) -> Result<TeamMemorySyncFetchResult, AgentError> {
508 if !is_team_memory_sync_available() {
510 return Ok(TeamMemorySyncFetchResult {
511 success: false,
512 data: None,
513 is_empty: None,
514 not_modified: None,
515 checksum: None,
516 error: Some("No OAuth token available for team memory sync".to_string()),
517 skip_retry: Some(true),
518 error_type: Some("auth".to_string()),
519 http_status: None,
520 });
521 }
522
523 let hashes_url = build_team_memory_url(repo_slug, Some("hashes"));
525 let headers = build_team_memory_headers(state.last_known_checksum.as_deref(), None)
526 .map_err(|e| AgentError::Internal(e))?;
527
528 let client = reqwest::Client::builder()
529 .timeout(std::time::Duration::from_millis(
530 TEAM_MEMORY_SYNC_TIMEOUT_MS,
531 ))
532 .build()
533 .map_err(|e| AgentError::Internal(e.to_string()))?;
534
535 let hashes_response = match client
537 .get(&hashes_url)
538 .headers(headers.clone())
539 .send()
540 .await
541 {
542 Ok(r) => r,
543 Err(e) => {
544 let is_timeout = e.is_timeout() || e.is_connect();
545 return Ok(TeamMemorySyncFetchResult {
546 success: false,
547 data: None,
548 is_empty: None,
549 not_modified: None,
550 checksum: None,
551 error: Some(format!("Team memory request failed: {}", e)),
552 skip_retry: Some(!is_timeout),
553 error_type: Some(if is_timeout { "timeout" } else { "network" }.to_string()),
554 http_status: None,
555 });
556 }
557 };
558
559 let hashes_status = hashes_response.status();
560
561 if hashes_status == 304 {
563 log::debug!("Team memory not modified (304) for repo: {}", repo_slug);
564 return Ok(TeamMemorySyncFetchResult {
565 success: true,
566 data: None,
567 is_empty: Some(false),
568 not_modified: Some(true),
569 checksum: state.last_known_checksum.clone(),
570 error: None,
571 skip_retry: Some(true),
572 error_type: None,
573 http_status: Some(304),
574 });
575 }
576
577 if hashes_status == 404 {
579 log::debug!("No team memory exists for repo: {}", repo_slug);
580 return Ok(TeamMemorySyncFetchResult {
581 success: true,
582 data: None,
583 is_empty: Some(true),
584 not_modified: Some(false),
585 checksum: None,
586 error: None,
587 skip_retry: Some(true),
588 error_type: None,
589 http_status: Some(404),
590 });
591 }
592
593 if !hashes_status.is_success() {
594 let body = hashes_response.text().await.unwrap_or_default();
595 log::debug!(
596 "Team memory hashes probe failed with status {}: {}",
597 hashes_status,
598 body
599 );
600 return Ok(TeamMemorySyncFetchResult {
601 success: false,
602 data: None,
603 is_empty: None,
604 not_modified: None,
605 checksum: None,
606 error: Some(format!(
607 "Team memory probe failed with status {}: {}",
608 hashes_status, body
609 )),
610 skip_retry: Some(hashes_status.is_client_error()),
611 error_type: Some("api".to_string()),
612 http_status: Some(hashes_status.as_u16()),
613 });
614 }
615
616 let hashes_result = match hashes_response.json::<TeamMemoryHashesResult>().await {
618 Ok(r) => r,
619 Err(e) => {
620 return Ok(TeamMemorySyncFetchResult {
621 success: false,
622 data: None,
623 is_empty: None,
624 not_modified: None,
625 checksum: None,
626 error: Some(format!("Failed to parse team memory hashes: {}", e)),
627 skip_retry: Some(false),
628 error_type: Some("parse".to_string()),
629 http_status: Some(hashes_status.as_u16()),
630 });
631 }
632 };
633
634 if let Some(version) = hashes_result.version {
636 log::debug!(
637 "Team memory version: {}, checksum: {:?}",
638 version,
639 hashes_result.checksum
640 );
641 }
642
643 if let Some(ref entry_checksums) = hashes_result.entry_checksums {
645 state.server_checksums = entry_checksums.clone();
646 }
647 if let Some(ref checksum) = hashes_result.checksum {
648 state.last_known_checksum = Some(checksum.clone());
649 }
650
651 let full_url = build_team_memory_url(repo_slug, None);
653 let full_headers = build_team_memory_headers(state.last_known_checksum.as_deref(), None)
654 .map_err(|e| AgentError::Internal(e))?;
655
656 let full_response = match client.get(&full_url).headers(full_headers).send().await {
657 Ok(r) => r,
658 Err(e) => {
659 let is_timeout = e.is_timeout() || e.is_connect();
660 return Ok(TeamMemorySyncFetchResult {
661 success: false,
662 data: None,
663 is_empty: None,
664 not_modified: None,
665 checksum: state.last_known_checksum.clone(),
666 error: Some(format!("Team memory fetch failed: {}", e)),
667 skip_retry: Some(!is_timeout),
668 error_type: Some(if is_timeout { "timeout" } else { "network" }.to_string()),
669 http_status: None,
670 });
671 }
672 };
673
674 let full_status = full_response.status();
675
676 if full_status == 304 {
678 log::debug!(
679 "Team memory content not modified (304) for repo: {}",
680 repo_slug
681 );
682 return Ok(TeamMemorySyncFetchResult {
683 success: true,
684 data: None,
685 is_empty: Some(false),
686 not_modified: Some(true),
687 checksum: state.last_known_checksum.clone(),
688 error: None,
689 skip_retry: Some(true),
690 error_type: None,
691 http_status: Some(304),
692 });
693 }
694
695 if full_status == 404 {
697 return Ok(TeamMemorySyncFetchResult {
698 success: true,
699 data: None,
700 is_empty: Some(true),
701 not_modified: Some(false),
702 checksum: None,
703 error: None,
704 skip_retry: Some(true),
705 error_type: None,
706 http_status: Some(404),
707 });
708 }
709
710 let response_etag = full_response
712 .headers()
713 .get(reqwest::header::ETAG)
714 .and_then(|v| v.to_str().ok())
715 .map(String::from);
716
717 if let Some(ref etag) = response_etag {
718 state.last_known_checksum = Some(etag.clone());
719 }
720
721 if !full_status.is_success() {
722 let body = full_response.text().await.unwrap_or_default();
723 return Ok(TeamMemorySyncFetchResult {
724 success: false,
725 data: None,
726 is_empty: None,
727 not_modified: None,
728 checksum: state.last_known_checksum.clone(),
729 error: Some(format!(
730 "Team memory fetch failed with status {}: {}",
731 full_status, body
732 )),
733 skip_retry: Some(full_status.is_client_error()),
734 error_type: Some("api".to_string()),
735 http_status: Some(full_status.as_u16()),
736 });
737 }
738
739 match full_response.json::<TeamMemoryData>().await {
741 Ok(data) => {
742 log::info!(
743 "Successfully pulled team memory for repo: {}, version: {}, entries: {}",
744 repo_slug,
745 data.version,
746 data.content.entries.len()
747 );
748
749 state.last_known_checksum = Some(data.checksum.clone());
751 state.server_checksums = data.content.entry_checksums.clone();
752
753 Ok(TeamMemorySyncFetchResult {
754 success: true,
755 data: Some(data),
756 is_empty: Some(false),
757 not_modified: Some(false),
758 checksum: state.last_known_checksum.clone(),
759 error: None,
760 skip_retry: None,
761 error_type: None,
762 http_status: Some(full_status.as_u16()),
763 })
764 }
765 Err(e) => Ok(TeamMemorySyncFetchResult {
766 success: false,
767 data: None,
768 is_empty: None,
769 not_modified: None,
770 checksum: state.last_known_checksum.clone(),
771 error: Some(format!("Failed to parse team memory response: {}", e)),
772 skip_retry: Some(false),
773 error_type: Some("parse".to_string()),
774 http_status: Some(full_status.as_u16()),
775 }),
776 }
777}
778
779pub async fn push_team_memory(
781 state: &mut SyncState,
782 repo_slug: &str,
783 entries: &HashMap<String, String>,
784) -> Result<TeamMemorySyncPushResult, AgentError> {
785 if !is_team_memory_sync_available() {
787 return Ok(TeamMemorySyncPushResult {
788 success: false,
789 files_uploaded: 0,
790 checksum: None,
791 conflict: None,
792 error: Some("No OAuth token available for team memory sync".to_string()),
793 skipped_secrets: Vec::new(),
794 error_type: Some("auth".to_string()),
795 http_status: None,
796 });
797 }
798
799 let skipped_secrets = scan_entries_for_secrets(entries);
801 let entries_to_upload: HashMap<String, String> = entries
802 .iter()
803 .filter(|(path, _)| !skipped_secrets.iter().any(|s| s.path == **path))
804 .map(|(k, v)| (k.clone(), v.clone()))
805 .collect();
806
807 if entries_to_upload.is_empty() {
808 return Ok(TeamMemorySyncPushResult {
809 success: true,
810 files_uploaded: 0,
811 checksum: state.last_known_checksum.clone(),
812 conflict: None,
813 error: None,
814 skipped_secrets,
815 error_type: None,
816 http_status: None,
817 });
818 }
819
820 if entries_to_upload.len() > 1000 {
822 return Ok(TeamMemorySyncPushResult {
823 success: false,
824 files_uploaded: 0,
825 checksum: None,
826 conflict: None,
827 error: Some(format!(
828 "Too many entries: {} (max: 1000)",
829 entries_to_upload.len()
830 )),
831 skipped_secrets,
832 error_type: Some("too_many_entries".to_string()),
833 http_status: Some(413),
834 });
835 }
836
837 let body = TeamMemoryContent {
839 entries: entries_to_upload.clone(),
840 entry_checksums: entries_to_upload
841 .iter()
842 .map(|(k, v)| (k.clone(), hash_content(v)))
843 .collect(),
844 };
845
846 let url = build_team_memory_url(repo_slug, None);
847 let mut headers = build_team_memory_headers(None, None).map_err(|e| AgentError::Internal(e))?;
848
849 if let Some(ref checksum) = state.last_known_checksum {
851 headers.insert(
852 "If-Match",
853 reqwest::header::HeaderValue::from_str(checksum)
854 .map_err(|e| AgentError::Internal(e.to_string()))?,
855 );
856 }
857
858 let client = reqwest::Client::builder()
859 .timeout(std::time::Duration::from_millis(
860 TEAM_MEMORY_SYNC_TIMEOUT_MS,
861 ))
862 .build()
863 .map_err(|e| AgentError::Internal(e.to_string()))?;
864
865 let response = match client.put(&url).headers(headers).json(&body).send().await {
866 Ok(r) => r,
867 Err(e) => {
868 let is_timeout = e.is_timeout() || e.is_connect();
869 return Ok(TeamMemorySyncPushResult {
870 success: false,
871 files_uploaded: 0,
872 checksum: None,
873 conflict: None,
874 error: Some(format!("Team memory push failed: {}", e)),
875 skipped_secrets,
876 error_type: Some(if is_timeout { "timeout" } else { "network" }.to_string()),
877 http_status: None,
878 });
879 }
880 };
881
882 let status = response.status();
883
884 if status == 412 {
886 log::debug!("Team memory conflict (412) for repo: {}", repo_slug);
887 return Ok(TeamMemorySyncPushResult {
888 success: false,
889 files_uploaded: 0,
890 checksum: None,
891 conflict: Some(true),
892 error: Some("Conflict: team memory was modified by another client".to_string()),
893 skipped_secrets,
894 error_type: Some("conflict".to_string()),
895 http_status: Some(412),
896 });
897 }
898
899 if status == 413 {
901 let body_text = response.text().await.unwrap_or_default();
902 let max_entries =
903 if let Ok(error_body) = serde_json::from_str::<TeamMemoryTooManyEntries>(&body_text) {
904 Some(error_body.error.details.max_entries)
905 } else {
906 None
907 };
908
909 if let Some(max) = max_entries {
910 state.server_max_entries = Some(max);
911 }
912
913 return Ok(TeamMemorySyncPushResult {
914 success: false,
915 files_uploaded: 0,
916 checksum: None,
917 conflict: None,
918 error: Some(format!(
919 "Payload too large: {} entries (max: {:?})",
920 entries_to_upload.len(),
921 max_entries
922 )),
923 skipped_secrets,
924 error_type: Some("payload_too_large".to_string()),
925 http_status: Some(413),
926 });
927 }
928
929 let response_etag = response
931 .headers()
932 .get(reqwest::header::ETAG)
933 .and_then(|v| v.to_str().ok())
934 .map(String::from);
935
936 if let Some(ref etag) = response_etag {
937 state.last_known_checksum = Some(etag.clone());
938 }
939
940 if !status.is_success() {
941 let body_text = response.text().await.unwrap_or_default();
942 return Ok(TeamMemorySyncPushResult {
943 success: false,
944 files_uploaded: 0,
945 checksum: None,
946 conflict: None,
947 error: Some(format!(
948 "Team memory push failed with status {}: {}",
949 status, body_text
950 )),
951 skipped_secrets,
952 error_type: Some("api".to_string()),
953 http_status: Some(status.as_u16()),
954 });
955 }
956
957 let files_uploaded = entries_to_upload.len() as u32;
958 log::info!(
959 "Successfully pushed {} team memory files for repo: {}",
960 files_uploaded,
961 repo_slug
962 );
963
964 Ok(TeamMemorySyncPushResult {
965 success: true,
966 files_uploaded,
967 checksum: state.last_known_checksum.clone(),
968 conflict: None,
969 error: None,
970 skipped_secrets,
971 error_type: None,
972 http_status: Some(status.as_u16()),
973 })
974}
975
976pub async fn sync_team_memory(
978 state: &mut SyncState,
979 repo_slug: &str,
980) -> Result<TeamMemorySyncPushResult, AgentError> {
981 let pull_result = pull_team_memory(state, repo_slug).await?;
983
984 if !pull_result.success {
985 return Ok(TeamMemorySyncPushResult {
986 success: false,
987 files_uploaded: 0,
988 checksum: None,
989 conflict: None,
990 error: pull_result.error,
991 skipped_secrets: Vec::new(),
992 error_type: pull_result.error_type,
993 http_status: pull_result.http_status,
994 });
995 }
996
997 let local_entries = read_local_team_memory().await?;
999
1000 let delta = compute_delta(&local_entries, &state.server_checksums);
1002
1003 if delta.is_empty() {
1004 return Ok(TeamMemorySyncPushResult {
1005 success: true,
1006 files_uploaded: 0,
1007 checksum: state.last_known_checksum.clone(),
1008 conflict: None,
1009 error: None,
1010 skipped_secrets: Vec::new(),
1011 error_type: None,
1012 http_status: None,
1013 });
1014 }
1015
1016 push_team_memory(state, repo_slug, &delta).await
1018}
1019
1020struct SecretMatch {
1026 rule_id: String,
1027 label: String,
1028}
1029
1030fn rule_id_to_label(rule_id: &str) -> String {
1032 let special = [
1033 ("aws", "AWS"), ("gcp", "GCP"), ("api", "API"), ("pat", "PAT"),
1034 ("ad", "AD"), ("tf", "TF"), ("oauth", "OAuth"), ("npm", "NPM"),
1035 ("pypi", "PyPI"), ("jwt", "JWT"), ("github", "GitHub"),
1036 ("gitlab", "GitLab"), ("openai", "OpenAI"), ("digitalocean", "DigitalOcean"),
1037 ("huggingface", "HuggingFace"), ("hashicorp", "HashiCorp"),
1038 ("sendgrid", "SendGrid"),
1039 ];
1040 rule_id.split('-')
1041 .map(|part| {
1042 if let Some(&(_, canonical)) = special.iter().find(|&&(k, _)| k == part) {
1043 canonical.to_string()
1044 } else {
1045 let mut s = String::new();
1046 let mut chars = part.chars();
1047 if let Some(c) = chars.next() {
1048 s.push(c.to_ascii_uppercase());
1049 for ch in chars {
1050 s.push(ch);
1051 }
1052 }
1053 s
1054 }
1055 })
1056 .collect()
1057}
1058
1059fn scan_content_for_secrets(content: &str) -> Vec<SecretMatch> {
1060 let qt = chr(39); let dq = chr(34); let bt = chr(96); let q = format!("{}{}{}", qt, dq, bt);
1066
1067 let rules: Vec<(&str, String)> = vec![
1068 ("aws-access-token", r"\b((?:A3T[A-Z0-9]|AKIA|ASIA|ABIA|ACCA)[A-Z2-7]{16})\b".to_string()),
1070 ("gcp-api-key", r"\b(AIza[\w-]{35})(?:[__Q__]\s;]|\\[nr]|$)".replace("__Q__", &q)),
1071 ("azure-ad-client-secret", r"(?:^|[__Q__]\s>=:(,)])([a-zA-Z0-9_~.]{3}\dQ~[a-zA-Z0-9_~.-]{31,34})(?:$|[__Q__]\s<),])".replace("__Q__", &q)),
1072 ("digitalocean-pat", r"\b(dop_v1_[a-f0-9]{64})(?:[__Q__]\s;]|\\[nr]|$)".replace("__Q__", &q)),
1073 ("digitalocean-access-token", r"\b(doo_v1_[a-f0-9]{64})(?:[__Q__]\s;]|\\[nr]|$)".replace("__Q__", &q)),
1074 ("anthropic-api-key", r"\b(sk-ant-api03-[a-zA-Z0-9_\-]{93}AA)(?:[__Q__]\s;]|\\[nr]|$)".replace("__Q__", &q)),
1076 ("anthropic-admin-api-key", r"\b(sk-ant-admin01-[a-zA-Z0-9_\-]{93}AA)(?:[__Q__]\s;]|\\[nr]|$)".replace("__Q__", &q)),
1077 ("openai-api-key", r"\b(sk-(?:proj|svcacct|admin)-(?:[A-Za-z0-9_-]{74}|[A-Za-z0-9_-]{58})T3BlbkFJ(?:[A-Za-z0-9_-]{74}|[A-Za-z0-9_-]{58})\b|sk-[a-zA-Z0-9]{20}T3BlbkFJ[a-zA-Z0-9]{20})(?:[__Q__]\s;]|\\[nr]|$)".replace("__Q__", &q)),
1078 ("huggingface-access-token", r"\b(hf_[a-zA-Z]{34})(?:[__Q__]\s;]|\\[nr]|$)".replace("__Q__", &q)),
1079 ("github-pat", r"ghp_[0-9a-zA-Z]{36}".to_string()),
1081 ("github-fine-grained-pat", r"github_pat_\w{82}".to_string()),
1082 ("github-app-token", r"(?:ghu|ghs)_[0-9a-zA-Z]{36}".to_string()),
1083 ("github-oauth", r"gho_[0-9a-zA-Z]{36}".to_string()),
1084 ("github-refresh-token", r"ghr_[0-9a-zA-Z]{36}".to_string()),
1085 ("gitlab-pat", r"glpat-[\w-]{20}".to_string()),
1086 ("gitlab-deploy-token", r"gldt-[0-9a-zA-Z_\-]{20}".to_string()),
1087 ("slack-bot-token", r"xoxb-[0-9]{10,13}-[0-9]{10,13}[a-zA-Z0-9-]*".to_string()),
1089 ("slack-user-token", r"xox[pe](?:-[0-9]{10,13}){3}-[a-zA-Z0-9-]{28,34}".to_string()),
1090 ("slack-app-token", r"(?i)xapp-\d-[A-Z0-9]+-\d+-[a-z0-9]+".to_string()),
1091 ("twilio-api-key", r"SK[0-9a-fA-F]{32}".to_string()),
1092 ("sendgrid-api-token", r"\b(SG\.[a-zA-Z0-9=_\-.]{66})(?:[__Q__]\s;]|\\[nr]|$)".replace("__Q__", &q)),
1093 ("npm-access-token", r"\b(npm_[a-zA-Z0-9]{36})(?:[__Q__]\s;]|\\[nr]|$)".replace("__Q__", &q)),
1095 ("pypi-upload-token", r"pypi-AgEIcHlwaS5vcmc[\w-]{50,1000}".to_string()),
1096 ("databricks-api-token", r"\b(dapi[a-f0-9]{32}(?:-\d)?)(?:[__Q__]\s;]|\\[nr]|$)".replace("__Q__", &q)),
1097 ("hashicorp-tf-api-token", r"[a-zA-Z0-9]{14}\.atlasv1\.[a-zA-Z0-9\-_=]{60,70}".to_string()),
1098 ("pulumi-api-token", r"\b(pul-[a-f0-9]{40})(?:[__Q__]\s;]|\\[nr]|$)".replace("__Q__", &q)),
1099 ("postman-api-token", r"\b(PMAK-[a-fA-F0-9]{24}-[a-fA-F0-9]{34})(?:[__Q__]\s;]|\\[nr]|$)".replace("__Q__", &q)),
1100 ("grafana-api-key", r"\b(eyJrIjoi[A-Za-z0-9+/]{70,400}={0,3})(?:[__Q__]\s;]|\\[nr]|$)".replace("__Q__", &q)),
1102 ("grafana-cloud-api-token", r"\b(glc_[A-Za-z0-9+/]{32,400}={0,3})(?:[__Q__]\s;]|\\[nr]|$)".replace("__Q__", &q)),
1103 ("grafana-service-account-token", r"\b(glsa_[A-Za-z0-9]{32}_[A-Fa-f0-9]{8})(?:[__Q__]\s;]|\\[nr]|$)".replace("__Q__", &q)),
1104 ("sentry-user-token", r"\b(sntryu_[a-f0-9]{64})(?:[__Q__]\s;]|\\[nr]|$)".replace("__Q__", &q)),
1105 ("sentry-org-token", r"\bsntrys_eyJpYXQiO[a-zA-Z0-9+/]{10,200}(?:LCJyZWdpb25fdXJs|InJlZ2lvbl91cmwi|cmVnaW9uX3VybCI6)[a-zA-Z0-9+/]{10,200}={0,2}_[a-zA-Z0-9+/]{43}".to_string()),
1106 ("stripe-access-token", r"\b((?:sk|rk)_(?:test|live|prod)_[a-zA-Z0-9]{10,99})(?:[__Q__]\s;]|\\[nr]|$)".replace("__Q__", &q)),
1108 ("shopify-access-token", r"shpat_[a-fA-F0-9]{32}".to_string()),
1109 ("shopify-shared-secret", r"shpss_[a-fA-F0-9]{32}".to_string()),
1110 ("private-key", r"(?i)-----BEGIN[ A-Z0-9_-]{0,100}PRIVATE KEY(?: BLOCK)?-----[\s\S-]{64,}?-----END[ A-Z0-9_-]{0,100}PRIVATE KEY(?: BLOCK)?-----".to_string()),
1112 ];
1113
1114 let mut matches = Vec::new();
1115 let mut seen = std::collections::HashSet::new();
1116
1117 for (rule_id, pattern) in &rules {
1118 if seen.contains(*rule_id) {
1119 continue;
1120 }
1121 if let Ok(re) = regex::Regex::new(pattern) {
1122 if re.is_match(content) {
1123 seen.insert(*rule_id);
1124 matches.push(SecretMatch {
1125 rule_id: (*rule_id).to_string(),
1126 label: rule_id_to_label(rule_id),
1127 });
1128 }
1129 }
1130 }
1131
1132 matches
1133}
1134
1135fn chr(code: u32) -> String {
1136 std::char::from_u32(code).map(|c| c.to_string()).unwrap_or_default()
1137}
1138
1139pub fn scan_for_secrets(content: &str, path: &str) -> Vec<SkippedSecretFile> {
1141 scan_content_for_secrets(content)
1142 .into_iter()
1143 .map(|m| SkippedSecretFile {
1144 path: path.to_string(),
1145 rule_id: m.rule_id,
1146 label: m.label,
1147 })
1148 .collect()
1149}
1150
1151pub fn scan_entries_for_secrets(entries: &HashMap<String, String>) -> Vec<SkippedSecretFile> {
1153 let mut skipped = Vec::new();
1154
1155 for (path, content) in entries {
1156 skipped.extend(scan_for_secrets(content, path));
1157 }
1158
1159 skipped
1160}
1161
1162static TEAM_MEMORY_ENABLED: AtomicBool = AtomicBool::new(false);
1166
1167pub fn is_team_memory_enabled() -> bool {
1169 TEAM_MEMORY_ENABLED.load(Ordering::SeqCst)
1170}
1171
1172pub fn enable_team_memory() {
1174 TEAM_MEMORY_ENABLED.store(true, Ordering::SeqCst);
1175}
1176
1177pub fn disable_team_memory() {
1179 TEAM_MEMORY_ENABLED.store(false, Ordering::SeqCst);
1180}
1181
1182static LAST_SYNC_ERROR: Mutex<Option<String>> = Mutex::new(None);
1184
1185pub fn set_last_sync_error(error: Option<String>) {
1187 *LAST_SYNC_ERROR.lock().unwrap() = error;
1188}
1189
1190pub fn get_last_sync_error() -> Option<String> {
1192 LAST_SYNC_ERROR.lock().unwrap().clone()
1193}
1194
1195pub fn reset_team_memory_for_testing() {
1197 disable_team_memory();
1198 *LAST_SYNC_ERROR.lock().unwrap() = None;
1199}
1200
1201