1use super::AppState;
9use super::api::require_auth;
10use super::api_agents::build_kumiho_client;
11use super::kumiho_client::{KumihoClient, KumihoError};
12
13fn normalize_kref(raw: &str) -> String {
15 let stripped = raw.strip_prefix("kref://").unwrap_or(raw);
16 format!("kref://{stripped}")
17}
18use axum::{
19 extract::{Path, Query, State},
20 http::{HeaderMap, StatusCode},
21 response::{IntoResponse, Json},
22};
23use parking_lot::Mutex;
24use serde::{Deserialize, Serialize};
25use std::collections::{HashMap, HashSet, VecDeque};
26use std::sync::OnceLock;
27use std::time::Instant;
28
29struct TeamCache {
32 teams: Vec<TeamResponse>,
33 include_deprecated: bool,
34 fetched_at: Instant,
35}
36
37static TEAM_CACHE: OnceLock<Mutex<Option<TeamCache>>> = OnceLock::new();
38const CACHE_TTL_SECS: u64 = 30;
39
40fn get_cached_teams(include_deprecated: bool) -> Option<Vec<TeamResponse>> {
41 let lock = TEAM_CACHE.get_or_init(|| Mutex::new(None));
42 let cache = lock.lock();
43 if let Some(ref c) = *cache {
44 if c.include_deprecated == include_deprecated
45 && c.fetched_at.elapsed().as_secs() < CACHE_TTL_SECS
46 {
47 return Some(c.teams.clone());
48 }
49 }
50 None
51}
52
53fn set_cached_teams(teams: &[TeamResponse], include_deprecated: bool) {
54 let lock = TEAM_CACHE.get_or_init(|| Mutex::new(None));
55 let mut cache = lock.lock();
56 *cache = Some(TeamCache {
57 teams: teams.to_vec(),
58 include_deprecated,
59 fetched_at: Instant::now(),
60 });
61}
62
63pub fn invalidate_team_cache() {
64 if let Some(lock) = TEAM_CACHE.get() {
65 let mut cache = lock.lock();
66 *cache = None;
67 }
68}
69
70const TEAM_SPACE_NAME: &str = "Teams";
72
73fn team_project(state: &AppState) -> String {
75 state.config.lock().kumiho.harness_project.clone()
76}
77
78fn team_space_path(state: &AppState) -> String {
80 format!("/{}/{}", team_project(state), TEAM_SPACE_NAME)
81}
82
83#[derive(Deserialize)]
86pub struct TeamListQuery {
87 #[serde(default)]
89 pub include_deprecated: bool,
90 pub page: Option<u32>,
92 pub per_page: Option<u32>,
94}
95
96#[derive(Deserialize)]
97pub struct CreateTeamBody {
98 pub name: String,
99 pub description: Option<String>,
100 #[serde(default)]
101 pub members: Vec<String>, #[serde(default)]
103 pub edges: Vec<TeamEdgeBody>,
104}
105
106#[derive(Deserialize)]
107pub struct TeamEdgeBody {
108 pub from_kref: String,
109 pub to_kref: String,
110 pub edge_type: String, }
112
113#[derive(Deserialize)]
114pub struct DeprecateBody {
115 pub kref: String,
116 pub deprecated: bool,
117}
118
119#[derive(Debug, Serialize, Clone)]
122pub struct TeamResponse {
123 pub kref: String,
124 pub name: String,
125 pub description: String,
126 pub deprecated: bool,
127 pub created_at: Option<String>,
128 pub members: Vec<TeamMemberResponse>,
129 pub edges: Vec<TeamEdgeResponse>,
130 #[serde(skip_serializing_if = "Option::is_none")]
132 pub member_count: Option<u32>,
133 #[serde(skip_serializing_if = "Option::is_none")]
134 pub member_names: Option<Vec<String>>,
135 #[serde(skip_serializing_if = "Option::is_none")]
136 pub edge_count: Option<u32>,
137}
138
139#[derive(Debug, Serialize, Clone)]
140pub struct TeamMemberResponse {
141 pub kref: String,
142 pub name: String,
143 pub role: String,
144 pub agent_type: String,
145 pub identity: String,
146 pub expertise: Vec<String>,
147}
148
149#[derive(Debug, Serialize, Clone)]
150pub struct TeamEdgeResponse {
151 pub from_kref: String,
152 pub to_kref: String,
153 pub edge_type: String,
154}
155
156fn kumiho_err(e: KumihoError) -> (StatusCode, Json<serde_json::Value>) {
160 match &e {
161 KumihoError::Unreachable(_) => (
162 StatusCode::SERVICE_UNAVAILABLE,
163 Json(serde_json::json!({ "error": format!("Kumiho service unavailable: {e}") })),
164 ),
165 KumihoError::Api { status, body } => {
166 let code = if *status == 401 || *status == 403 {
167 StatusCode::BAD_GATEWAY
168 } else {
169 StatusCode::from_u16(*status).unwrap_or(StatusCode::BAD_GATEWAY)
170 };
171 (
172 code,
173 Json(serde_json::json!({ "error": format!("Kumiho upstream: {body}") })),
174 )
175 }
176 KumihoError::Decode(msg) => (
177 StatusCode::BAD_GATEWAY,
178 Json(serde_json::json!({ "error": format!("Bad response from Kumiho: {msg}") })),
179 ),
180 }
181}
182
183async fn build_team_response(
186 client: &KumihoClient,
187 bundle_kref: &str,
188 name: &str,
189 description: &str,
190 deprecated: bool,
191 created_at: Option<String>,
192) -> Result<TeamResponse, KumihoError> {
193 let members_resp = match client.list_bundle_members(bundle_kref).await {
195 Ok(resp) => resp,
196 Err(_) => {
197 return Ok(TeamResponse {
199 kref: bundle_kref.to_string(),
200 name: name.to_string(),
201 description: description.to_string(),
202 deprecated,
203 created_at,
204 members: Vec::new(),
205 edges: Vec::new(),
206 member_count: None,
207 member_names: None,
208 edge_count: None,
209 });
210 }
211 };
212
213 let member_krefs: Vec<String> = members_resp
215 .members
216 .iter()
217 .map(|m| m.item_kref.clone())
218 .collect();
219
220 let rev_map = client
222 .batch_get_revisions(&member_krefs, "published")
223 .await
224 .unwrap_or_default();
225 let missing: Vec<String> = member_krefs
226 .iter()
227 .filter(|k| !rev_map.contains_key(*k))
228 .cloned()
229 .collect();
230 let latest_map = if !missing.is_empty() {
231 client
232 .batch_get_revisions(&missing, "latest")
233 .await
234 .unwrap_or_default()
235 } else {
236 std::collections::HashMap::new()
237 };
238
239 let mut member_responses = Vec::with_capacity(member_krefs.len());
240 for member_kref in &member_krefs {
241 let rev = rev_map
242 .get(member_kref)
243 .or_else(|| latest_map.get(member_kref));
244 let meta = rev.map(|r| &r.metadata);
245 let get =
246 |key: &str| -> String { meta.and_then(|m| m.get(key)).cloned().unwrap_or_default() };
247
248 let expertise_str = get("expertise");
249 let expertise: Vec<String> = if expertise_str.is_empty() {
250 Vec::new()
251 } else {
252 expertise_str
253 .split(',')
254 .map(|s| s.trim().to_string())
255 .collect()
256 };
257
258 let item_name = {
259 let name_from_kref = member_kref
260 .rsplit('/')
261 .next()
262 .and_then(|s| s.split('.').next())
263 .unwrap_or("")
264 .to_string();
265 if name_from_kref.is_empty() {
266 member_kref.clone()
267 } else {
268 name_from_kref
269 }
270 };
271
272 member_responses.push(TeamMemberResponse {
273 kref: member_kref.clone(),
274 name: item_name,
275 role: get("role"),
276 agent_type: get("agent_type"),
277 identity: get("identity"),
278 expertise,
279 });
280 }
281
282 let member_kref_set: HashSet<String> = member_krefs.iter().cloned().collect();
286 let edge_handles: Vec<_> = member_krefs
287 .iter()
288 .filter_map(|kref| {
289 let rev = rev_map.get(kref).or_else(|| latest_map.get(kref));
290 rev.map(|r| {
291 let client = client.clone();
292 let item_kref = kref.clone();
293 let rev_kref = r.kref.clone();
294 tokio::spawn(async move {
295 let edges = tokio::time::timeout(
296 std::time::Duration::from_secs(8),
297 client.list_edges(&rev_kref, None, Some("outgoing")),
298 )
299 .await
300 .ok()
301 .and_then(|r| r.ok())
302 .unwrap_or_default();
303 (item_kref, edges)
304 })
305 })
306 })
307 .collect();
308
309 let mut edge_results = Vec::new();
310 for handle in edge_handles {
311 if let Ok(result) = handle.await {
312 edge_results.push(result);
313 }
314 }
315
316 let mut edge_responses = Vec::new();
317 for (member_kref, edges) in edge_results {
318 for edge in edges {
319 let target_item_kref = edge
320 .target_kref
321 .split('?')
322 .next()
323 .unwrap_or(&edge.target_kref);
324 if target_item_kref == member_kref {
326 continue;
327 }
328 if member_kref_set.contains(target_item_kref)
329 || member_krefs
330 .iter()
331 .any(|mk| edge.target_kref.starts_with(mk))
332 {
333 edge_responses.push(TeamEdgeResponse {
334 from_kref: member_kref.clone(),
335 to_kref: target_item_kref.to_string(),
336 edge_type: edge.edge_type,
337 });
338 }
339 }
340 }
341
342 Ok(TeamResponse {
343 kref: bundle_kref.to_string(),
344 name: name.to_string(),
345 description: description.to_string(),
346 deprecated,
347 created_at,
348 member_count: None,
349 member_names: None,
350 edge_count: None,
351 members: member_responses,
352 edges: edge_responses,
353 })
354}
355
356#[derive(Debug, Serialize)]
359struct TeamValidationError {
360 code: String,
361 message: String,
362}
363
364fn validate_team_edges(members: &[String], edges: &[TeamEdgeBody]) -> Vec<TeamValidationError> {
367 let mut errors = Vec::new();
368 let member_set: HashSet<&str> = members.iter().map(|s| s.as_str()).collect();
369
370 for edge in edges {
372 if edge.from_kref == edge.to_kref {
373 errors.push(TeamValidationError {
374 code: "self_edge".into(),
375 message: format!(
376 "Self-referencing edge: {} ({}).",
377 &edge.from_kref, edge.edge_type
378 ),
379 });
380 }
381 }
382
383 for edge in edges {
385 if !member_set.contains(edge.from_kref.as_str()) {
386 errors.push(TeamValidationError {
387 code: "dangling_ref".into(),
388 message: format!("Edge from_kref not a team member: {}", &edge.from_kref),
389 });
390 }
391 if !member_set.contains(edge.to_kref.as_str()) {
392 errors.push(TeamValidationError {
393 code: "dangling_ref".into(),
394 message: format!("Edge to_kref not a team member: {}", &edge.to_kref),
395 });
396 }
397 }
398
399 let mut depends_pairs: HashSet<(&str, &str)> = HashSet::new();
401 for edge in edges {
402 let et = edge.edge_type.to_uppercase();
403 if et == "DEPENDS_ON" {
404 let pair = (edge.from_kref.as_str(), edge.to_kref.as_str());
405 let reverse = (pair.1, pair.0);
406 if depends_pairs.contains(&reverse) {
407 errors.push(TeamValidationError {
408 code: "reciprocal_depends".into(),
409 message: format!(
410 "Reciprocal DEPENDS_ON between {} and {}. Pick one direction.",
411 &edge.from_kref, &edge.to_kref,
412 ),
413 });
414 }
415 depends_pairs.insert(pair);
416 }
417 }
418
419 let execution_types: HashSet<&str> = ["DEPENDS_ON", "SUPPORTS", "FEEDS_INTO"]
421 .into_iter()
422 .collect();
423
424 let mut in_degree: HashMap<&str, usize> = HashMap::new();
426 let mut adj: HashMap<&str, Vec<&str>> = HashMap::new();
427 for m in members {
428 in_degree.insert(m.as_str(), 0);
429 adj.insert(m.as_str(), Vec::new());
430 }
431
432 for edge in edges {
433 let et = edge.edge_type.to_uppercase();
434 if !execution_types.contains(et.as_str()) {
435 continue;
436 }
437 let from = edge.from_kref.as_str();
438 let to = edge.to_kref.as_str();
439 if !member_set.contains(from) || !member_set.contains(to) {
440 continue;
441 }
442
443 if et == "DEPENDS_ON" {
444 adj.entry(to).or_default().push(from);
446 *in_degree.entry(from).or_default() += 1;
447 } else {
448 adj.entry(from).or_default().push(to);
450 *in_degree.entry(to).or_default() += 1;
451 }
452 }
453
454 let mut queue: VecDeque<&str> = in_degree
455 .iter()
456 .filter(|&(_, deg)| *deg == 0)
457 .map(|(&k, _)| k)
458 .collect();
459
460 let mut visited = 0usize;
461 while let Some(node) = queue.pop_front() {
462 visited += 1;
463 for &dep in adj.get(node).unwrap_or(&Vec::new()) {
464 if let Some(deg) = in_degree.get_mut(dep) {
465 *deg -= 1;
466 if *deg == 0 {
467 queue.push_back(dep);
468 }
469 }
470 }
471 }
472
473 if visited < members.len() {
474 let cycle_members: Vec<&str> = in_degree
475 .iter()
476 .filter(|&(_, deg)| *deg > 0)
477 .map(|(&k, _)| k)
478 .collect();
479 errors.push(TeamValidationError {
480 code: "cycle".into(),
481 message: format!(
482 "Dependency cycle detected among {} member(s). Break the cycle by removing or reversing an edge.",
483 cycle_members.len(),
484 ),
485 });
486 }
487
488 errors
489}
490
491pub async fn handle_list_teams(
498 State(state): State<AppState>,
499 headers: HeaderMap,
500 Query(query): Query<TeamListQuery>,
501) -> impl IntoResponse {
502 if let Err(e) = require_auth(&state, &headers) {
503 return e.into_response();
504 }
505
506 let client = build_kumiho_client(&state);
507 let project_name = team_project(&state);
508 let space_path = team_space_path(&state);
509
510 let items = match client
511 .list_teams_in(&space_path, query.include_deprecated)
512 .await
513 {
514 Ok(items) => items,
515 Err(ref e) if matches!(e, KumihoError::Api { status: 404, .. }) => {
516 let _ = client.ensure_project(&project_name).await;
517 let _ = client.ensure_space(&project_name, TEAM_SPACE_NAME).await;
518 return Json(
519 serde_json::json!({ "teams": [], "total_count": 0, "page": 1, "per_page": 9 }),
520 )
521 .into_response();
522 }
523 Err(ref e) if matches!(e, KumihoError::Api { status: 500, .. }) => {
524 tracing::warn!("Teams list failed (Kumiho 500, likely corrupted data): {e}");
525 return Json(serde_json::json!({ "teams": [], "total_count": 0, "page": 1, "per_page": 9, "warning": "Kumiho returned a server error." })).into_response();
526 }
527 Err(e) => return kumiho_err(e).into_response(),
528 };
529
530 let total_count = items.len() as u32;
531 let per_page = query.per_page.unwrap_or(9).min(50).max(1);
532 let page = query.page.unwrap_or(1).max(1);
533 let skip = ((page - 1) * per_page) as usize;
534
535 let teams: Vec<TeamResponse> = items
537 .iter()
538 .skip(skip)
539 .take(per_page as usize)
540 .map(|item| {
541 let member_count = item
542 .metadata
543 .get("member_count")
544 .and_then(|v| v.parse::<u32>().ok());
545 let member_names = item.metadata.get("member_names").map(|v| {
546 v.split(',')
547 .map(|s| s.trim().to_string())
548 .filter(|s| !s.is_empty())
549 .collect::<Vec<_>>()
550 });
551 let edge_count = item
552 .metadata
553 .get("edge_count")
554 .and_then(|v| v.parse::<u32>().ok());
555
556 TeamResponse {
557 kref: item.kref.clone(),
558 name: item.item_name.clone(),
559 description: item
560 .metadata
561 .get("description")
562 .cloned()
563 .unwrap_or_default(),
564 deprecated: item.deprecated,
565 created_at: item.created_at.clone(),
566 members: Vec::new(),
567 edges: Vec::new(),
568 member_count,
569 member_names,
570 edge_count,
571 }
572 })
573 .collect();
574
575 Json(serde_json::json!({
576 "teams": teams,
577 "total_count": total_count,
578 "page": page,
579 "per_page": per_page,
580 }))
581 .into_response()
582}
583
584pub async fn handle_create_team(
586 State(state): State<AppState>,
587 headers: HeaderMap,
588 Json(body): Json<CreateTeamBody>,
589) -> impl IntoResponse {
590 if let Err(e) = require_auth(&state, &headers) {
591 return e.into_response();
592 }
593
594 let client = build_kumiho_client(&state);
595 let project_name = team_project(&state);
596 let space_path = team_space_path(&state);
597
598 let (proj_res, space_res) = tokio::join!(
600 client.ensure_project(&project_name),
601 client.ensure_space(&project_name, TEAM_SPACE_NAME),
602 );
603 if let Err(e) = proj_res {
604 return kumiho_err(e).into_response();
605 }
606 if let Err(e) = space_res {
607 return kumiho_err(e).into_response();
608 }
609
610 let validation_errors = validate_team_edges(&body.members, &body.edges);
612 if !validation_errors.is_empty() {
613 return (
614 StatusCode::BAD_REQUEST,
615 Json(serde_json::json!({
616 "error": "Team graph is invalid",
617 "validation_errors": validation_errors,
618 })),
619 )
620 .into_response();
621 }
622
623 let mut metadata = HashMap::new();
625 if let Some(ref desc) = body.description {
626 metadata.insert("description".to_string(), desc.clone());
627 }
628
629 let bundle = match client
631 .create_bundle(&space_path, &body.name, metadata)
632 .await
633 {
634 Ok(b) => b,
635 Err(e) => return kumiho_err(e).into_response(),
636 };
637
638 let member_handles: Vec<_> = body
640 .members
641 .iter()
642 .map(|member_kref| {
643 let client = client.clone();
644 let bundle_kref = bundle.kref.clone();
645 let member_kref = member_kref.clone();
646 tokio::spawn(async move {
647 client
648 .add_bundle_member(&bundle_kref, &member_kref, HashMap::new())
649 .await
650 })
651 })
652 .collect();
653 for handle in member_handles {
654 match handle.await {
655 Ok(Ok(_)) => {}
656 Ok(Err(e)) => return kumiho_err(e).into_response(),
657 Err(_) => return kumiho_err(KumihoError::Decode("task failed".into())).into_response(),
658 }
659 }
660
661 if !body.edges.is_empty() {
663 let edge_krefs: Vec<String> = body
665 .edges
666 .iter()
667 .flat_map(|e| [e.from_kref.clone(), e.to_kref.clone()])
668 .collect::<std::collections::HashSet<_>>()
669 .into_iter()
670 .collect();
671
672 let rev_map = client
674 .batch_get_revisions(&edge_krefs, "published")
675 .await
676 .unwrap_or_default();
677 let missing: Vec<String> = edge_krefs
678 .iter()
679 .filter(|k| !rev_map.contains_key(*k))
680 .cloned()
681 .collect();
682 let latest_map = if !missing.is_empty() {
683 client
684 .batch_get_revisions(&missing, "latest")
685 .await
686 .unwrap_or_default()
687 } else {
688 HashMap::new()
689 };
690
691 let edge_handles: Vec<_> = body
693 .edges
694 .iter()
695 .filter(|edge| edge.from_kref != edge.to_kref)
696 .filter_map(|edge| {
697 let source = rev_map
698 .get(&edge.from_kref)
699 .or_else(|| latest_map.get(&edge.from_kref));
700 let target = rev_map
701 .get(&edge.to_kref)
702 .or_else(|| latest_map.get(&edge.to_kref));
703 match (source, target) {
704 (Some(s), Some(t)) => {
705 let client = client.clone();
706 let src_kref = s.kref.clone();
707 let tgt_kref = t.kref.clone();
708 let edge_type = edge.edge_type.clone();
709 Some(tokio::spawn(async move {
710 client
711 .create_edge(&src_kref, &tgt_kref, &edge_type, HashMap::new())
712 .await
713 }))
714 }
715 _ => None,
716 }
717 })
718 .collect();
719 for handle in edge_handles {
720 if let Ok(Err(e)) = handle.await {
721 return kumiho_err(e).into_response();
722 }
723 }
724 }
725
726 let member_names: Vec<String> = body
728 .members
729 .iter()
730 .map(|k| {
731 k.rsplit('/')
732 .next()
733 .and_then(|s| s.split('.').next())
734 .unwrap_or("")
735 .to_string()
736 })
737 .collect();
738 let mut summary_meta = HashMap::new();
739 summary_meta.insert("member_count".to_string(), body.members.len().to_string());
740 summary_meta.insert("member_names".to_string(), member_names.join(","));
741 summary_meta.insert("edge_count".to_string(), body.edges.len().to_string());
742 if let Some(ref desc) = body.description {
743 summary_meta.insert("description".to_string(), desc.clone());
744 }
745 if let Ok(rev) = client.create_revision(&bundle.kref, summary_meta).await {
746 let _ = client.tag_revision(&rev.kref, "published").await;
747 }
748
749 let description = body.description.as_deref().unwrap_or("");
751 match build_team_response(
752 &client,
753 &bundle.kref,
754 &bundle.item_name,
755 description,
756 bundle.deprecated,
757 bundle.created_at.clone(),
758 )
759 .await
760 {
761 Ok(team) => {
762 invalidate_team_cache();
763 (
764 StatusCode::CREATED,
765 Json(serde_json::json!({ "team": team })),
766 )
767 .into_response()
768 }
769 Err(e) => kumiho_err(e).into_response(),
770 }
771}
772
773pub async fn handle_get_team(
775 State(state): State<AppState>,
776 headers: HeaderMap,
777 Path(kref): Path<String>,
778) -> impl IntoResponse {
779 if let Err(e) = require_auth(&state, &headers) {
780 return e.into_response();
781 }
782
783 let kref = normalize_kref(&kref);
784 let client = build_kumiho_client(&state);
785
786 let bundle = match client.get_bundle(&kref).await {
787 Ok(b) => b,
788 Err(e) => return kumiho_err(e).into_response(),
789 };
790
791 let description = bundle
792 .metadata
793 .get("description")
794 .cloned()
795 .unwrap_or_default();
796 match build_team_response(
797 &client,
798 &bundle.kref,
799 &bundle.item_name,
800 &description,
801 bundle.deprecated,
802 bundle.created_at.clone(),
803 )
804 .await
805 {
806 Ok(team) => Json(serde_json::json!({ "team": team })).into_response(),
807 Err(e) => kumiho_err(e).into_response(),
808 }
809}
810
811pub async fn handle_update_team(
813 State(state): State<AppState>,
814 headers: HeaderMap,
815 Path(kref): Path<String>,
816 Json(body): Json<CreateTeamBody>,
817) -> impl IntoResponse {
818 if let Err(e) = require_auth(&state, &headers) {
819 return e.into_response();
820 }
821
822 let kref = normalize_kref(&kref);
823 let client = build_kumiho_client(&state);
824
825 let bundle = match client.get_bundle(&kref).await {
827 Ok(b) => b,
828 Err(e) => return kumiho_err(e).into_response(),
829 };
830
831 let validation_errors = validate_team_edges(&body.members, &body.edges);
833 if !validation_errors.is_empty() {
834 return (
835 StatusCode::BAD_REQUEST,
836 Json(serde_json::json!({
837 "error": "Team graph is invalid",
838 "validation_errors": validation_errors,
839 })),
840 )
841 .into_response();
842 }
843
844 let member_names: Vec<String> = body
846 .members
847 .iter()
848 .map(|k| {
849 k.rsplit('/')
850 .next()
851 .and_then(|s| s.split('.').next())
852 .unwrap_or("")
853 .to_string()
854 })
855 .collect();
856 let mut metadata = HashMap::new();
857 metadata.insert("name".to_string(), body.name.clone());
858 if let Some(ref desc) = body.description {
859 metadata.insert("description".to_string(), desc.clone());
860 }
861 metadata.insert("member_count".to_string(), body.members.len().to_string());
862 metadata.insert("member_names".to_string(), member_names.join(","));
863 metadata.insert("edge_count".to_string(), body.edges.len().to_string());
864 if let Ok(rev) = client.create_revision(&kref, metadata).await {
865 let _ = client.tag_revision(&rev.kref, "published").await;
866 }
867
868 let current_members = match client.list_bundle_members(&kref).await {
870 Ok(m) => m,
871 Err(e) => return kumiho_err(e).into_response(),
872 };
873
874 let current_krefs: Vec<String> = current_members
875 .members
876 .iter()
877 .map(|m| m.item_kref.clone())
878 .collect();
879 let desired_krefs: Vec<String> = body.members.clone();
880
881 let to_add: Vec<_> = desired_krefs
883 .iter()
884 .filter(|k| !current_krefs.contains(k))
885 .cloned()
886 .collect();
887 let to_remove: Vec<_> = current_krefs
888 .iter()
889 .filter(|k| !desired_krefs.contains(k))
890 .cloned()
891 .collect();
892
893 let add_handles: Vec<_> = to_add
894 .iter()
895 .map(|member_kref| {
896 let client = client.clone();
897 let bundle_kref = kref.clone();
898 let member_kref = member_kref.clone();
899 tokio::spawn(async move {
900 client
901 .add_bundle_member(&bundle_kref, &member_kref, HashMap::new())
902 .await
903 })
904 })
905 .collect();
906 let remove_handles: Vec<_> = to_remove
907 .iter()
908 .map(|member_kref| {
909 let client = client.clone();
910 let bundle_kref = kref.clone();
911 let member_kref = member_kref.clone();
912 tokio::spawn(async move {
913 client
914 .remove_bundle_member(&bundle_kref, &member_kref)
915 .await
916 })
917 })
918 .collect();
919
920 for handle in add_handles {
921 if let Ok(Err(e)) = handle.await {
922 return kumiho_err(e).into_response();
923 }
924 }
925 for handle in remove_handles {
926 if let Ok(Err(e)) = handle.await {
927 return kumiho_err(e).into_response();
928 }
929 }
930
931 let all_member_krefs: Vec<String> = current_krefs
933 .iter()
934 .chain(desired_krefs.iter())
935 .cloned()
936 .collect::<std::collections::HashSet<_>>()
937 .into_iter()
938 .collect();
939
940 let rev_map = client
941 .batch_get_revisions(&all_member_krefs, "published")
942 .await
943 .unwrap_or_default();
944 let missing: Vec<String> = all_member_krefs
945 .iter()
946 .filter(|k| !rev_map.contains_key(*k))
947 .cloned()
948 .collect();
949 let latest_map = if !missing.is_empty() {
950 client
951 .batch_get_revisions(&missing, "latest")
952 .await
953 .unwrap_or_default()
954 } else {
955 HashMap::new()
956 };
957
958 let delete_handles: Vec<_> = all_member_krefs
960 .iter()
961 .filter_map(|mk| {
962 let rev = rev_map.get(mk).or_else(|| latest_map.get(mk));
963 rev.map(|r| {
964 let client = client.clone();
965 let rev_kref = r.kref.clone();
966 tokio::spawn(async move {
967 if let Ok(edges) = client.list_edges(&rev_kref, None, Some("outgoing")).await {
968 for edge in edges {
969 let _ = client
970 .delete_edge(&edge.source_kref, &edge.target_kref, &edge.edge_type)
971 .await;
972 }
973 }
974 })
975 })
976 })
977 .collect();
978 for handle in delete_handles {
979 let _ = handle.await;
980 }
981
982 if !body.edges.is_empty() {
984 let edge_handles: Vec<_> = body
985 .edges
986 .iter()
987 .filter(|edge| edge.from_kref != edge.to_kref)
988 .filter_map(|edge| {
989 let source = rev_map
990 .get(&edge.from_kref)
991 .or_else(|| latest_map.get(&edge.from_kref));
992 let target = rev_map
993 .get(&edge.to_kref)
994 .or_else(|| latest_map.get(&edge.to_kref));
995 match (source, target) {
996 (Some(s), Some(t)) => {
997 let client = client.clone();
998 let src_kref = s.kref.clone();
999 let tgt_kref = t.kref.clone();
1000 let edge_type = edge.edge_type.clone();
1001 Some(tokio::spawn(async move {
1002 client
1003 .create_edge(&src_kref, &tgt_kref, &edge_type, HashMap::new())
1004 .await
1005 }))
1006 }
1007 _ => None,
1008 }
1009 })
1010 .collect();
1011 for handle in edge_handles {
1012 if let Ok(Err(e)) = handle.await {
1013 return kumiho_err(e).into_response();
1014 }
1015 }
1016 }
1017
1018 let description = body.description.as_deref().unwrap_or("");
1020 match build_team_response(
1021 &client,
1022 &kref,
1023 &body.name,
1024 description,
1025 bundle.deprecated,
1026 bundle.created_at.clone(),
1027 )
1028 .await
1029 {
1030 Ok(team) => {
1031 invalidate_team_cache();
1032 Json(serde_json::json!({ "team": team })).into_response()
1033 }
1034 Err(e) => kumiho_err(e).into_response(),
1035 }
1036}
1037
1038pub async fn handle_delete_team(
1040 State(state): State<AppState>,
1041 headers: HeaderMap,
1042 Path(kref): Path<String>,
1043) -> impl IntoResponse {
1044 if let Err(e) = require_auth(&state, &headers) {
1045 return e.into_response();
1046 }
1047
1048 let kref = normalize_kref(&kref);
1049 let client = build_kumiho_client(&state);
1050
1051 if let Ok(members_resp) = client.list_bundle_members(&kref).await {
1053 for member in &members_resp.members {
1054 if let Ok(rev) = client.get_published_or_latest(&member.item_kref).await {
1055 if let Ok(edges) = client.list_edges(&rev.kref, None, Some("outgoing")).await {
1056 for edge in edges {
1057 let _ = client
1058 .delete_edge(&edge.source_kref, &edge.target_kref, &edge.edge_type)
1059 .await;
1060 }
1061 }
1062 }
1063 }
1064
1065 for member in &members_resp.members {
1067 let _ = client.remove_bundle_member(&kref, &member.item_kref).await;
1068 }
1069 }
1070
1071 match client.delete_bundle(&kref).await {
1073 Ok(()) => {
1074 invalidate_team_cache();
1075 StatusCode::NO_CONTENT.into_response()
1076 }
1077 Err(e) => kumiho_err(e).into_response(),
1078 }
1079}
1080
1081pub async fn handle_deprecate_team(
1083 State(state): State<AppState>,
1084 headers: HeaderMap,
1085 Json(body): Json<DeprecateBody>,
1086) -> impl IntoResponse {
1087 if let Err(e) = require_auth(&state, &headers) {
1088 return e.into_response();
1089 }
1090
1091 let kref = body.kref.clone();
1092 let client = build_kumiho_client(&state);
1093
1094 match client.deprecate_team(&kref, body.deprecated).await {
1095 Ok(()) => {
1096 invalidate_team_cache();
1097 Json(serde_json::json!({ "success": true })).into_response()
1098 }
1099 Err(e) => kumiho_err(e).into_response(),
1100 }
1101}