1use super::AppState;
9use super::api::require_auth;
10use super::api_agents::build_kumiho_client;
11use super::kumiho_client::{KumihoClient, KumihoError};
12
13fn normalize_kref(raw: &str) -> String {
15 let stripped = raw.strip_prefix("kref://").unwrap_or(raw);
16 format!("kref://{stripped}")
17}
18use axum::{
19 extract::{Path, Query, State},
20 http::{HeaderMap, StatusCode},
21 response::{IntoResponse, Json},
22};
23use parking_lot::Mutex;
24use serde::{Deserialize, Serialize};
25use std::collections::{HashMap, HashSet, VecDeque};
26use std::sync::OnceLock;
27use std::time::Instant;
28
29struct TeamCache {
32 teams: Vec<TeamResponse>,
33 include_deprecated: bool,
34 fetched_at: Instant,
35}
36
37static TEAM_CACHE: OnceLock<Mutex<Option<TeamCache>>> = OnceLock::new();
38const CACHE_TTL_SECS: u64 = 30;
39
40fn get_cached_teams(include_deprecated: bool) -> Option<Vec<TeamResponse>> {
41 let lock = TEAM_CACHE.get_or_init(|| Mutex::new(None));
42 let cache = lock.lock();
43 if let Some(ref c) = *cache {
44 if c.include_deprecated == include_deprecated
45 && c.fetched_at.elapsed().as_secs() < CACHE_TTL_SECS
46 {
47 return Some(c.teams.clone());
48 }
49 }
50 None
51}
52
53fn set_cached_teams(teams: &[TeamResponse], include_deprecated: bool) {
54 let lock = TEAM_CACHE.get_or_init(|| Mutex::new(None));
55 let mut cache = lock.lock();
56 *cache = Some(TeamCache {
57 teams: teams.to_vec(),
58 include_deprecated,
59 fetched_at: Instant::now(),
60 });
61}
62
63pub fn invalidate_team_cache() {
64 if let Some(lock) = TEAM_CACHE.get() {
65 let mut cache = lock.lock();
66 *cache = None;
67 }
68}
69
70const TEAM_SPACE_NAME: &str = "Teams";
72
73fn team_project(state: &AppState) -> String {
75 state.config.lock().kumiho.harness_project.clone()
76}
77
78fn team_space_path(state: &AppState) -> String {
80 format!("/{}/{}", team_project(state), TEAM_SPACE_NAME)
81}
82
83#[derive(Deserialize)]
86pub struct TeamListQuery {
87 #[serde(default)]
89 pub include_deprecated: bool,
90 pub page: Option<u32>,
92 pub per_page: Option<u32>,
94}
95
96#[derive(Deserialize)]
97pub struct CreateTeamBody {
98 pub name: String,
99 pub description: Option<String>,
100 #[serde(default)]
101 pub members: Vec<String>, #[serde(default)]
103 pub edges: Vec<TeamEdgeBody>,
104}
105
106#[derive(Deserialize)]
107pub struct TeamEdgeBody {
108 pub from_kref: String,
109 pub to_kref: String,
110 pub edge_type: String, }
112
113#[derive(Deserialize)]
114pub struct DeprecateBody {
115 pub kref: String,
116 pub deprecated: bool,
117}
118
119#[derive(Debug, Serialize, Clone)]
122pub struct TeamResponse {
123 pub kref: String,
124 pub name: String,
125 pub description: String,
126 pub deprecated: bool,
127 pub created_at: Option<String>,
128 pub members: Vec<TeamMemberResponse>,
129 pub edges: Vec<TeamEdgeResponse>,
130 #[serde(skip_serializing_if = "Option::is_none")]
132 pub member_count: Option<u32>,
133 #[serde(skip_serializing_if = "Option::is_none")]
134 pub member_names: Option<Vec<String>>,
135 #[serde(skip_serializing_if = "Option::is_none")]
136 pub edge_count: Option<u32>,
137}
138
139#[derive(Debug, Serialize, Clone)]
140pub struct TeamMemberResponse {
141 pub kref: String,
142 pub name: String,
143 pub role: String,
144 pub agent_type: String,
145 pub identity: String,
146 pub expertise: Vec<String>,
147}
148
149#[derive(Debug, Serialize, Clone)]
150pub struct TeamEdgeResponse {
151 pub from_kref: String,
152 pub to_kref: String,
153 pub edge_type: String,
154}
155
156fn kumiho_err(e: KumihoError) -> (StatusCode, Json<serde_json::Value>) {
160 match &e {
161 KumihoError::Unreachable(_) => (
162 StatusCode::SERVICE_UNAVAILABLE,
163 Json(serde_json::json!({ "error": format!("Kumiho service unavailable: {e}") })),
164 ),
165 KumihoError::Api { status, body } => {
166 let code = if *status == 401 || *status == 403 {
167 StatusCode::BAD_GATEWAY
168 } else {
169 StatusCode::from_u16(*status).unwrap_or(StatusCode::BAD_GATEWAY)
170 };
171 (
172 code,
173 Json(serde_json::json!({ "error": format!("Kumiho upstream: {body}") })),
174 )
175 }
176 KumihoError::Decode(msg) => (
177 StatusCode::BAD_GATEWAY,
178 Json(serde_json::json!({ "error": format!("Bad response from Kumiho: {msg}") })),
179 ),
180 }
181}
182
183async fn build_team_response(
186 client: &KumihoClient,
187 bundle_kref: &str,
188 name: &str,
189 description: &str,
190 deprecated: bool,
191 created_at: Option<String>,
192) -> Result<TeamResponse, KumihoError> {
193 let members_resp = match client.list_bundle_members(bundle_kref).await {
195 Ok(resp) => resp,
196 Err(_) => {
197 return Ok(TeamResponse {
199 kref: bundle_kref.to_string(),
200 name: name.to_string(),
201 description: description.to_string(),
202 deprecated,
203 created_at,
204 members: Vec::new(),
205 edges: Vec::new(),
206 member_count: None,
207 member_names: None,
208 edge_count: None,
209 });
210 }
211 };
212
213 let member_krefs: Vec<String> = members_resp
215 .members
216 .iter()
217 .map(|m| m.item_kref.clone())
218 .collect();
219
220 let rev_map = client
222 .batch_get_revisions(&member_krefs, "published")
223 .await
224 .unwrap_or_default();
225 let missing: Vec<String> = member_krefs
226 .iter()
227 .filter(|k| !rev_map.contains_key(*k))
228 .cloned()
229 .collect();
230 let latest_map = if !missing.is_empty() {
231 client
232 .batch_get_revisions(&missing, "latest")
233 .await
234 .unwrap_or_default()
235 } else {
236 std::collections::HashMap::new()
237 };
238
239 let mut member_responses = Vec::with_capacity(member_krefs.len());
240 for member_kref in &member_krefs {
241 let rev = rev_map
242 .get(member_kref)
243 .or_else(|| latest_map.get(member_kref));
244 let meta = rev.map(|r| &r.metadata);
245 let get =
246 |key: &str| -> String { meta.and_then(|m| m.get(key)).cloned().unwrap_or_default() };
247
248 let expertise_str = get("expertise");
249 let expertise: Vec<String> = if expertise_str.is_empty() {
250 Vec::new()
251 } else {
252 expertise_str
253 .split(',')
254 .map(|s| s.trim().to_string())
255 .collect()
256 };
257
258 let item_name = {
259 let name_from_kref = member_kref
260 .rsplit('/')
261 .next()
262 .and_then(|s| s.split('.').next())
263 .unwrap_or("")
264 .to_string();
265 if name_from_kref.is_empty() {
266 member_kref.clone()
267 } else {
268 name_from_kref
269 }
270 };
271
272 member_responses.push(TeamMemberResponse {
273 kref: member_kref.clone(),
274 name: item_name,
275 role: get("role"),
276 agent_type: get("agent_type"),
277 identity: get("identity"),
278 expertise,
279 });
280 }
281
282 let member_kref_set: HashSet<String> = member_krefs.iter().cloned().collect();
286 let edge_handles: Vec<_> = member_krefs
287 .iter()
288 .filter_map(|kref| {
289 let rev = rev_map.get(kref).or_else(|| latest_map.get(kref));
290 rev.map(|r| {
291 let client = client.clone();
292 let item_kref = kref.clone();
293 let rev_kref = r.kref.clone();
294 tokio::spawn(async move {
295 let edges = tokio::time::timeout(
296 std::time::Duration::from_secs(8),
297 client.list_edges(&rev_kref, None, Some("outgoing")),
298 )
299 .await
300 .ok()
301 .and_then(|r| r.ok())
302 .unwrap_or_default();
303 (item_kref, edges)
304 })
305 })
306 })
307 .collect();
308
309 let mut edge_results = Vec::new();
310 for handle in edge_handles {
311 if let Ok(result) = handle.await {
312 edge_results.push(result);
313 }
314 }
315
316 let mut edge_responses = Vec::new();
317 for (member_kref, edges) in edge_results {
318 for edge in edges {
319 let target_item_kref = edge
320 .target_kref
321 .split('?')
322 .next()
323 .unwrap_or(&edge.target_kref);
324 if target_item_kref == member_kref {
326 continue;
327 }
328 if member_kref_set.contains(target_item_kref)
329 || member_krefs
330 .iter()
331 .any(|mk| edge.target_kref.starts_with(mk))
332 {
333 edge_responses.push(TeamEdgeResponse {
334 from_kref: member_kref.clone(),
335 to_kref: target_item_kref.to_string(),
336 edge_type: edge.edge_type,
337 });
338 }
339 }
340 }
341
342 Ok(TeamResponse {
343 kref: bundle_kref.to_string(),
344 name: name.to_string(),
345 description: description.to_string(),
346 deprecated,
347 created_at,
348 member_count: None,
349 member_names: None,
350 edge_count: None,
351 members: member_responses,
352 edges: edge_responses,
353 })
354}
355
356#[derive(Debug, Serialize)]
359struct TeamValidationError {
360 code: String,
361 message: String,
362}
363
364fn validate_team_edges(members: &[String], edges: &[TeamEdgeBody]) -> Vec<TeamValidationError> {
367 let mut errors = Vec::new();
368 let member_set: HashSet<&str> = members.iter().map(|s| s.as_str()).collect();
369
370 for edge in edges {
372 if edge.from_kref == edge.to_kref {
373 errors.push(TeamValidationError {
374 code: "self_edge".into(),
375 message: format!(
376 "Self-referencing edge: {} ({}).",
377 &edge.from_kref, edge.edge_type
378 ),
379 });
380 }
381 }
382
383 for edge in edges {
385 if !member_set.contains(edge.from_kref.as_str()) {
386 errors.push(TeamValidationError {
387 code: "dangling_ref".into(),
388 message: format!("Edge from_kref not a team member: {}", &edge.from_kref),
389 });
390 }
391 if !member_set.contains(edge.to_kref.as_str()) {
392 errors.push(TeamValidationError {
393 code: "dangling_ref".into(),
394 message: format!("Edge to_kref not a team member: {}", &edge.to_kref),
395 });
396 }
397 }
398
399 let mut depends_pairs: HashSet<(&str, &str)> = HashSet::new();
401 for edge in edges {
402 let et = edge.edge_type.to_uppercase();
403 if et == "DEPENDS_ON" {
404 let pair = (edge.from_kref.as_str(), edge.to_kref.as_str());
405 let reverse = (pair.1, pair.0);
406 if depends_pairs.contains(&reverse) {
407 errors.push(TeamValidationError {
408 code: "reciprocal_depends".into(),
409 message: format!(
410 "Reciprocal DEPENDS_ON between {} and {}. Pick one direction.",
411 &edge.from_kref, &edge.to_kref,
412 ),
413 });
414 }
415 depends_pairs.insert(pair);
416 }
417 }
418
419 let execution_types: HashSet<&str> = ["DEPENDS_ON", "SUPPORTS", "FEEDS_INTO"]
421 .into_iter()
422 .collect();
423
424 let mut in_degree: HashMap<&str, usize> = HashMap::new();
426 let mut adj: HashMap<&str, Vec<&str>> = HashMap::new();
427 for m in members {
428 in_degree.insert(m.as_str(), 0);
429 adj.insert(m.as_str(), Vec::new());
430 }
431
432 for edge in edges {
433 let et = edge.edge_type.to_uppercase();
434 if !execution_types.contains(et.as_str()) {
435 continue;
436 }
437 let from = edge.from_kref.as_str();
438 let to = edge.to_kref.as_str();
439 if !member_set.contains(from) || !member_set.contains(to) {
440 continue;
441 }
442
443 if et == "DEPENDS_ON" {
444 adj.entry(to).or_default().push(from);
446 *in_degree.entry(from).or_default() += 1;
447 } else {
448 adj.entry(from).or_default().push(to);
450 *in_degree.entry(to).or_default() += 1;
451 }
452 }
453
454 let mut queue: VecDeque<&str> = in_degree
455 .iter()
456 .filter(|&(_, deg)| *deg == 0)
457 .map(|(&k, _)| k)
458 .collect();
459
460 let mut visited = 0usize;
461 while let Some(node) = queue.pop_front() {
462 visited += 1;
463 for &dep in adj.get(node).unwrap_or(&Vec::new()) {
464 if let Some(deg) = in_degree.get_mut(dep) {
465 *deg -= 1;
466 if *deg == 0 {
467 queue.push_back(dep);
468 }
469 }
470 }
471 }
472
473 if visited < members.len() {
474 let cycle_members: Vec<&str> = in_degree
475 .iter()
476 .filter(|&(_, deg)| *deg > 0)
477 .map(|(&k, _)| k)
478 .collect();
479 errors.push(TeamValidationError {
480 code: "cycle".into(),
481 message: format!(
482 "Dependency cycle detected among {} member(s). Break the cycle by removing or reversing an edge.",
483 cycle_members.len(),
484 ),
485 });
486 }
487
488 errors
489}
490
491pub async fn handle_list_teams(
498 State(state): State<AppState>,
499 headers: HeaderMap,
500 Query(query): Query<TeamListQuery>,
501) -> impl IntoResponse {
502 if let Err(e) = require_auth(&state, &headers) {
503 return e.into_response();
504 }
505
506 let client = build_kumiho_client(&state);
507 let project_name = team_project(&state);
508 let space_path = team_space_path(&state);
509
510 let items = match client.list_teams_in(&space_path, query.include_deprecated).await {
511 Ok(items) => items,
512 Err(ref e) if matches!(e, KumihoError::Api { status: 404, .. }) => {
513 let _ = client.ensure_project(&project_name).await;
514 let _ = client.ensure_space(&project_name, TEAM_SPACE_NAME).await;
515 return Json(serde_json::json!({ "teams": [], "total_count": 0, "page": 1, "per_page": 9 })).into_response();
516 }
517 Err(ref e) if matches!(e, KumihoError::Api { status: 500, .. }) => {
518 tracing::warn!("Teams list failed (Kumiho 500, likely corrupted data): {e}");
519 return Json(serde_json::json!({ "teams": [], "total_count": 0, "page": 1, "per_page": 9, "warning": "Kumiho returned a server error." })).into_response();
520 }
521 Err(e) => return kumiho_err(e).into_response(),
522 };
523
524 let total_count = items.len() as u32;
525 let per_page = query.per_page.unwrap_or(9).min(50).max(1);
526 let page = query.page.unwrap_or(1).max(1);
527 let skip = ((page - 1) * per_page) as usize;
528
529 let teams: Vec<TeamResponse> = items
531 .iter()
532 .skip(skip)
533 .take(per_page as usize)
534 .map(|item| {
535 let member_count = item
536 .metadata
537 .get("member_count")
538 .and_then(|v| v.parse::<u32>().ok());
539 let member_names = item.metadata.get("member_names").map(|v| {
540 v.split(',')
541 .map(|s| s.trim().to_string())
542 .filter(|s| !s.is_empty())
543 .collect::<Vec<_>>()
544 });
545 let edge_count = item
546 .metadata
547 .get("edge_count")
548 .and_then(|v| v.parse::<u32>().ok());
549
550 TeamResponse {
551 kref: item.kref.clone(),
552 name: item.item_name.clone(),
553 description: item
554 .metadata
555 .get("description")
556 .cloned()
557 .unwrap_or_default(),
558 deprecated: item.deprecated,
559 created_at: item.created_at.clone(),
560 members: Vec::new(),
561 edges: Vec::new(),
562 member_count,
563 member_names,
564 edge_count,
565 }
566 })
567 .collect();
568
569 Json(serde_json::json!({
570 "teams": teams,
571 "total_count": total_count,
572 "page": page,
573 "per_page": per_page,
574 }))
575 .into_response()
576}
577
578pub async fn handle_create_team(
580 State(state): State<AppState>,
581 headers: HeaderMap,
582 Json(body): Json<CreateTeamBody>,
583) -> impl IntoResponse {
584 if let Err(e) = require_auth(&state, &headers) {
585 return e.into_response();
586 }
587
588 let client = build_kumiho_client(&state);
589 let project_name = team_project(&state);
590 let space_path = team_space_path(&state);
591
592 let (proj_res, space_res) = tokio::join!(
594 client.ensure_project(&project_name),
595 client.ensure_space(&project_name, TEAM_SPACE_NAME),
596 );
597 if let Err(e) = proj_res {
598 return kumiho_err(e).into_response();
599 }
600 if let Err(e) = space_res {
601 return kumiho_err(e).into_response();
602 }
603
604 let validation_errors = validate_team_edges(&body.members, &body.edges);
606 if !validation_errors.is_empty() {
607 return (
608 StatusCode::BAD_REQUEST,
609 Json(serde_json::json!({
610 "error": "Team graph is invalid",
611 "validation_errors": validation_errors,
612 })),
613 )
614 .into_response();
615 }
616
617 let mut metadata = HashMap::new();
619 if let Some(ref desc) = body.description {
620 metadata.insert("description".to_string(), desc.clone());
621 }
622
623 let bundle = match client
625 .create_bundle(&space_path, &body.name, metadata)
626 .await
627 {
628 Ok(b) => b,
629 Err(e) => return kumiho_err(e).into_response(),
630 };
631
632 let member_handles: Vec<_> = body
634 .members
635 .iter()
636 .map(|member_kref| {
637 let client = client.clone();
638 let bundle_kref = bundle.kref.clone();
639 let member_kref = member_kref.clone();
640 tokio::spawn(async move {
641 client
642 .add_bundle_member(&bundle_kref, &member_kref, HashMap::new())
643 .await
644 })
645 })
646 .collect();
647 for handle in member_handles {
648 match handle.await {
649 Ok(Ok(_)) => {}
650 Ok(Err(e)) => return kumiho_err(e).into_response(),
651 Err(_) => return kumiho_err(KumihoError::Decode("task failed".into())).into_response(),
652 }
653 }
654
655 if !body.edges.is_empty() {
657 let edge_krefs: Vec<String> = body
659 .edges
660 .iter()
661 .flat_map(|e| [e.from_kref.clone(), e.to_kref.clone()])
662 .collect::<std::collections::HashSet<_>>()
663 .into_iter()
664 .collect();
665
666 let rev_map = client
668 .batch_get_revisions(&edge_krefs, "published")
669 .await
670 .unwrap_or_default();
671 let missing: Vec<String> = edge_krefs
672 .iter()
673 .filter(|k| !rev_map.contains_key(*k))
674 .cloned()
675 .collect();
676 let latest_map = if !missing.is_empty() {
677 client
678 .batch_get_revisions(&missing, "latest")
679 .await
680 .unwrap_or_default()
681 } else {
682 HashMap::new()
683 };
684
685 let edge_handles: Vec<_> = body
687 .edges
688 .iter()
689 .filter(|edge| edge.from_kref != edge.to_kref)
690 .filter_map(|edge| {
691 let source = rev_map
692 .get(&edge.from_kref)
693 .or_else(|| latest_map.get(&edge.from_kref));
694 let target = rev_map
695 .get(&edge.to_kref)
696 .or_else(|| latest_map.get(&edge.to_kref));
697 match (source, target) {
698 (Some(s), Some(t)) => {
699 let client = client.clone();
700 let src_kref = s.kref.clone();
701 let tgt_kref = t.kref.clone();
702 let edge_type = edge.edge_type.clone();
703 Some(tokio::spawn(async move {
704 client
705 .create_edge(&src_kref, &tgt_kref, &edge_type, HashMap::new())
706 .await
707 }))
708 }
709 _ => None,
710 }
711 })
712 .collect();
713 for handle in edge_handles {
714 if let Ok(Err(e)) = handle.await {
715 return kumiho_err(e).into_response();
716 }
717 }
718 }
719
720 let member_names: Vec<String> = body
722 .members
723 .iter()
724 .map(|k| {
725 k.rsplit('/')
726 .next()
727 .and_then(|s| s.split('.').next())
728 .unwrap_or("")
729 .to_string()
730 })
731 .collect();
732 let mut summary_meta = HashMap::new();
733 summary_meta.insert("member_count".to_string(), body.members.len().to_string());
734 summary_meta.insert("member_names".to_string(), member_names.join(","));
735 summary_meta.insert("edge_count".to_string(), body.edges.len().to_string());
736 if let Some(ref desc) = body.description {
737 summary_meta.insert("description".to_string(), desc.clone());
738 }
739 if let Ok(rev) = client.create_revision(&bundle.kref, summary_meta).await {
740 let _ = client.tag_revision(&rev.kref, "published").await;
741 }
742
743 let description = body.description.as_deref().unwrap_or("");
745 match build_team_response(
746 &client,
747 &bundle.kref,
748 &bundle.item_name,
749 description,
750 bundle.deprecated,
751 bundle.created_at.clone(),
752 )
753 .await
754 {
755 Ok(team) => {
756 invalidate_team_cache();
757 (
758 StatusCode::CREATED,
759 Json(serde_json::json!({ "team": team })),
760 )
761 .into_response()
762 }
763 Err(e) => kumiho_err(e).into_response(),
764 }
765}
766
767pub async fn handle_get_team(
769 State(state): State<AppState>,
770 headers: HeaderMap,
771 Path(kref): Path<String>,
772) -> impl IntoResponse {
773 if let Err(e) = require_auth(&state, &headers) {
774 return e.into_response();
775 }
776
777 let kref = normalize_kref(&kref);
778 let client = build_kumiho_client(&state);
779
780 let bundle = match client.get_bundle(&kref).await {
781 Ok(b) => b,
782 Err(e) => return kumiho_err(e).into_response(),
783 };
784
785 let description = bundle
786 .metadata
787 .get("description")
788 .cloned()
789 .unwrap_or_default();
790 match build_team_response(
791 &client,
792 &bundle.kref,
793 &bundle.item_name,
794 &description,
795 bundle.deprecated,
796 bundle.created_at.clone(),
797 )
798 .await
799 {
800 Ok(team) => Json(serde_json::json!({ "team": team })).into_response(),
801 Err(e) => kumiho_err(e).into_response(),
802 }
803}
804
805pub async fn handle_update_team(
807 State(state): State<AppState>,
808 headers: HeaderMap,
809 Path(kref): Path<String>,
810 Json(body): Json<CreateTeamBody>,
811) -> impl IntoResponse {
812 if let Err(e) = require_auth(&state, &headers) {
813 return e.into_response();
814 }
815
816 let kref = normalize_kref(&kref);
817 let client = build_kumiho_client(&state);
818
819 let bundle = match client.get_bundle(&kref).await {
821 Ok(b) => b,
822 Err(e) => return kumiho_err(e).into_response(),
823 };
824
825 let validation_errors = validate_team_edges(&body.members, &body.edges);
827 if !validation_errors.is_empty() {
828 return (
829 StatusCode::BAD_REQUEST,
830 Json(serde_json::json!({
831 "error": "Team graph is invalid",
832 "validation_errors": validation_errors,
833 })),
834 )
835 .into_response();
836 }
837
838 let member_names: Vec<String> = body
840 .members
841 .iter()
842 .map(|k| {
843 k.rsplit('/')
844 .next()
845 .and_then(|s| s.split('.').next())
846 .unwrap_or("")
847 .to_string()
848 })
849 .collect();
850 let mut metadata = HashMap::new();
851 metadata.insert("name".to_string(), body.name.clone());
852 if let Some(ref desc) = body.description {
853 metadata.insert("description".to_string(), desc.clone());
854 }
855 metadata.insert("member_count".to_string(), body.members.len().to_string());
856 metadata.insert("member_names".to_string(), member_names.join(","));
857 metadata.insert("edge_count".to_string(), body.edges.len().to_string());
858 if let Ok(rev) = client.create_revision(&kref, metadata).await {
859 let _ = client.tag_revision(&rev.kref, "published").await;
860 }
861
862 let current_members = match client.list_bundle_members(&kref).await {
864 Ok(m) => m,
865 Err(e) => return kumiho_err(e).into_response(),
866 };
867
868 let current_krefs: Vec<String> = current_members
869 .members
870 .iter()
871 .map(|m| m.item_kref.clone())
872 .collect();
873 let desired_krefs: Vec<String> = body.members.clone();
874
875 let to_add: Vec<_> = desired_krefs
877 .iter()
878 .filter(|k| !current_krefs.contains(k))
879 .cloned()
880 .collect();
881 let to_remove: Vec<_> = current_krefs
882 .iter()
883 .filter(|k| !desired_krefs.contains(k))
884 .cloned()
885 .collect();
886
887 let add_handles: Vec<_> = to_add
888 .iter()
889 .map(|member_kref| {
890 let client = client.clone();
891 let bundle_kref = kref.clone();
892 let member_kref = member_kref.clone();
893 tokio::spawn(async move {
894 client
895 .add_bundle_member(&bundle_kref, &member_kref, HashMap::new())
896 .await
897 })
898 })
899 .collect();
900 let remove_handles: Vec<_> = to_remove
901 .iter()
902 .map(|member_kref| {
903 let client = client.clone();
904 let bundle_kref = kref.clone();
905 let member_kref = member_kref.clone();
906 tokio::spawn(async move {
907 client
908 .remove_bundle_member(&bundle_kref, &member_kref)
909 .await
910 })
911 })
912 .collect();
913
914 for handle in add_handles {
915 if let Ok(Err(e)) = handle.await {
916 return kumiho_err(e).into_response();
917 }
918 }
919 for handle in remove_handles {
920 if let Ok(Err(e)) = handle.await {
921 return kumiho_err(e).into_response();
922 }
923 }
924
925 let all_member_krefs: Vec<String> = current_krefs
927 .iter()
928 .chain(desired_krefs.iter())
929 .cloned()
930 .collect::<std::collections::HashSet<_>>()
931 .into_iter()
932 .collect();
933
934 let rev_map = client
935 .batch_get_revisions(&all_member_krefs, "published")
936 .await
937 .unwrap_or_default();
938 let missing: Vec<String> = all_member_krefs
939 .iter()
940 .filter(|k| !rev_map.contains_key(*k))
941 .cloned()
942 .collect();
943 let latest_map = if !missing.is_empty() {
944 client
945 .batch_get_revisions(&missing, "latest")
946 .await
947 .unwrap_or_default()
948 } else {
949 HashMap::new()
950 };
951
952 let delete_handles: Vec<_> = all_member_krefs
954 .iter()
955 .filter_map(|mk| {
956 let rev = rev_map.get(mk).or_else(|| latest_map.get(mk));
957 rev.map(|r| {
958 let client = client.clone();
959 let rev_kref = r.kref.clone();
960 tokio::spawn(async move {
961 if let Ok(edges) = client.list_edges(&rev_kref, None, Some("outgoing")).await {
962 for edge in edges {
963 let _ = client
964 .delete_edge(&edge.source_kref, &edge.target_kref, &edge.edge_type)
965 .await;
966 }
967 }
968 })
969 })
970 })
971 .collect();
972 for handle in delete_handles {
973 let _ = handle.await;
974 }
975
976 if !body.edges.is_empty() {
978 let edge_handles: Vec<_> = body
979 .edges
980 .iter()
981 .filter(|edge| edge.from_kref != edge.to_kref)
982 .filter_map(|edge| {
983 let source = rev_map
984 .get(&edge.from_kref)
985 .or_else(|| latest_map.get(&edge.from_kref));
986 let target = rev_map
987 .get(&edge.to_kref)
988 .or_else(|| latest_map.get(&edge.to_kref));
989 match (source, target) {
990 (Some(s), Some(t)) => {
991 let client = client.clone();
992 let src_kref = s.kref.clone();
993 let tgt_kref = t.kref.clone();
994 let edge_type = edge.edge_type.clone();
995 Some(tokio::spawn(async move {
996 client
997 .create_edge(&src_kref, &tgt_kref, &edge_type, HashMap::new())
998 .await
999 }))
1000 }
1001 _ => None,
1002 }
1003 })
1004 .collect();
1005 for handle in edge_handles {
1006 if let Ok(Err(e)) = handle.await {
1007 return kumiho_err(e).into_response();
1008 }
1009 }
1010 }
1011
1012 let description = body.description.as_deref().unwrap_or("");
1014 match build_team_response(
1015 &client,
1016 &kref,
1017 &body.name,
1018 description,
1019 bundle.deprecated,
1020 bundle.created_at.clone(),
1021 )
1022 .await
1023 {
1024 Ok(team) => {
1025 invalidate_team_cache();
1026 Json(serde_json::json!({ "team": team })).into_response()
1027 }
1028 Err(e) => kumiho_err(e).into_response(),
1029 }
1030}
1031
1032pub async fn handle_delete_team(
1034 State(state): State<AppState>,
1035 headers: HeaderMap,
1036 Path(kref): Path<String>,
1037) -> impl IntoResponse {
1038 if let Err(e) = require_auth(&state, &headers) {
1039 return e.into_response();
1040 }
1041
1042 let kref = normalize_kref(&kref);
1043 let client = build_kumiho_client(&state);
1044
1045 if let Ok(members_resp) = client.list_bundle_members(&kref).await {
1047 for member in &members_resp.members {
1048 if let Ok(rev) = client.get_published_or_latest(&member.item_kref).await {
1049 if let Ok(edges) = client.list_edges(&rev.kref, None, Some("outgoing")).await {
1050 for edge in edges {
1051 let _ = client
1052 .delete_edge(&edge.source_kref, &edge.target_kref, &edge.edge_type)
1053 .await;
1054 }
1055 }
1056 }
1057 }
1058
1059 for member in &members_resp.members {
1061 let _ = client.remove_bundle_member(&kref, &member.item_kref).await;
1062 }
1063 }
1064
1065 match client.delete_bundle(&kref).await {
1067 Ok(()) => {
1068 invalidate_team_cache();
1069 StatusCode::NO_CONTENT.into_response()
1070 }
1071 Err(e) => kumiho_err(e).into_response(),
1072 }
1073}
1074
1075pub async fn handle_deprecate_team(
1077 State(state): State<AppState>,
1078 headers: HeaderMap,
1079 Json(body): Json<DeprecateBody>,
1080) -> impl IntoResponse {
1081 if let Err(e) = require_auth(&state, &headers) {
1082 return e.into_response();
1083 }
1084
1085 let kref = body.kref.clone();
1086 let client = build_kumiho_client(&state);
1087
1088 match client.deprecate_team(&kref, body.deprecated).await {
1089 Ok(()) => {
1090 invalidate_team_cache();
1091 Json(serde_json::json!({ "success": true })).into_response()
1092 }
1093 Err(e) => kumiho_err(e).into_response(),
1094 }
1095}