1mod provider_companion;
2
3pub use provider_companion::{
4 add_provider_model_handler, create_provider_handler, delete_provider_handler,
5 delete_provider_model_handler, get_provider_handler, get_provider_models_handler,
6 get_providers_handler, resolve_provider_handler, update_provider_handler,
7};
8
9use agent_diva_agent::AgentEvent;
10use agent_diva_core::bus::InboundMessage;
11use agent_diva_core::config::schema::ChannelsConfig;
12use axum::{
13 extract::{Multipart, Path, Query, State},
14 response::sse::{Event, Sse},
15 Json,
16};
17use futures::stream::{Stream, StreamExt};
18use std::convert::Infallible;
19use tokio::sync::{mpsc, oneshot};
20use tokio_stream::wrappers::BroadcastStream;
21use tokio_stream::wrappers::UnboundedReceiverStream;
22
23use crate::state::{
24 ApiRequest, AppState, ChannelUpdate, ConfigResponse, ConfigUpdate, FileUploadRequest,
25 ManagerCommand, McpRefreshRequest, RunCronJobRequest, SetCronJobEnabledRequest,
26 SetMcpEnabledRequest, SkillUploadRequest, StopChatRequest, ToolsConfigResponse,
27 ToolsConfigUpdate,
28};
29
30#[derive(serde::Deserialize)]
31pub struct ChatRequest {
32 pub message: String,
33 pub channel: Option<String>,
34 pub chat_id: Option<String>,
35 pub attachments: Option<Vec<String>>,
36}
37
38#[derive(serde::Deserialize, Default)]
39pub struct EventsQuery {
40 pub channel: Option<String>,
41 pub chat_id: Option<String>,
42 pub chat_prefix: Option<String>,
43}
44
45pub async fn chat_handler(
46 State(state): State<AppState>,
47 Json(payload): Json<ChatRequest>,
48) -> Sse<futures::stream::BoxStream<'static, Result<Event, Infallible>>> {
49 let channel = payload.channel.unwrap_or("api".to_string());
50 let chat_id = payload.chat_id.unwrap_or("default".to_string());
51
52 if payload.message.trim() == "/stop" {
53 let (stop_tx, stop_rx) = oneshot::channel();
54 let stop_req = StopChatRequest {
55 channel: Some(channel),
56 chat_id: Some(chat_id),
57 };
58 let stop_send_result = state
59 .api_tx
60 .send(ManagerCommand::StopChat(stop_req, stop_tx))
61 .await;
62
63 let stop_message = match stop_send_result {
64 Ok(_) => match stop_rx.await {
65 Ok(Ok(_)) => "Generation stopped by user.".to_string(),
66 Ok(Err(e)) => format!("Failed to stop generation: {}", e),
67 Err(e) => format!("Failed to receive stop response: {}", e),
68 },
69 Err(e) => format!("Failed to send stop request: {}", e),
70 };
71
72 let stream =
73 futures::stream::once(
74 async move { Ok(Event::default().event("error").data(stop_message)) },
75 )
76 .boxed();
77 return Sse::new(stream).keep_alive(axum::response::sse::KeepAlive::default());
78 }
79
80 let (event_tx, event_rx) = mpsc::unbounded_channel();
81
82 let mut msg = InboundMessage::new(channel, "user", chat_id, payload.message);
83 if let Some(attachments) = payload.attachments {
84 for attachment in attachments {
85 msg = msg.with_media(attachment);
86 }
87 }
88
89 let req = ApiRequest { msg, event_tx };
90
91 if let Err(e) = state.api_tx.send(ManagerCommand::Chat(req)).await {
92 tracing::error!("Failed to send API request to manager: {}", e);
93 }
94
95 let stream = UnboundedReceiverStream::new(event_rx)
96 .map(|event| {
97 let evt = match event {
98 AgentEvent::AssistantDelta { text } => Event::default().event("delta").data(text),
99 AgentEvent::ReasoningDelta { text } => {
100 Event::default().event("reasoning_delta").data(text)
101 }
102 AgentEvent::ToolCallDelta { name, args_delta } => {
103 let data = serde_json::json!({
104 "name": name,
105 "delta": args_delta
106 });
107 Event::default().event("tool_delta").data(data.to_string())
108 }
109 AgentEvent::FinalResponse { content } => {
110 Event::default().event("final").data(content)
111 }
112 AgentEvent::ToolCallStarted {
113 name,
114 args_preview,
115 call_id,
116 } => {
117 let data = serde_json::json!({
118 "name": name,
119 "args": args_preview,
120 "id": call_id
121 });
122 Event::default().event("tool_start").data(data.to_string())
123 }
124 AgentEvent::ToolCallFinished {
125 name,
126 result,
127 is_error,
128 call_id,
129 } => {
130 let data = serde_json::json!({
131 "name": name,
132 "result": result,
133 "error": is_error,
134 "id": call_id
135 });
136 Event::default().event("tool_finish").data(data.to_string())
137 }
138 AgentEvent::Error { message } => Event::default().event("error").data(message),
139 _ => Event::default().comment("keep-alive"),
140 };
141 Ok(evt)
142 })
143 .boxed();
144
145 Sse::new(stream).keep_alive(axum::response::sse::KeepAlive::default())
146}
147
148pub async fn stop_chat_handler(
149 State(state): State<AppState>,
150 Json(payload): Json<StopChatRequest>,
151) -> Json<serde_json::Value> {
152 let (tx, rx) = oneshot::channel();
153 if let Err(e) = state
154 .api_tx
155 .send(ManagerCommand::StopChat(payload, tx))
156 .await
157 {
158 tracing::error!("Failed to send StopChat request: {}", e);
159 return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
160 }
161
162 match rx.await {
163 Ok(Ok(stopped)) => Json(serde_json::json!({ "status": "ok", "stopped": stopped })),
164 Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
165 Err(e) => {
166 tracing::error!("Failed to receive StopChat response: {}", e);
167 Json(serde_json::json!({ "status": "error", "message": e.to_string() }))
168 }
169 }
170}
171
172pub async fn reset_session_handler(
173 State(state): State<AppState>,
174 Json(payload): Json<crate::state::ResetSessionRequest>,
175) -> Json<serde_json::Value> {
176 let (tx, rx) = oneshot::channel();
177 if let Err(e) = state
178 .api_tx
179 .send(ManagerCommand::ResetSession(payload, tx))
180 .await
181 {
182 tracing::error!("Failed to send ResetSession request: {}", e);
183 return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
184 }
185
186 match rx.await {
187 Ok(Ok(reset)) => Json(serde_json::json!({ "status": "ok", "reset": reset })),
188 Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
189 Err(e) => {
190 tracing::error!("Failed to receive ResetSession response: {}", e);
191 Json(serde_json::json!({ "status": "error", "message": e.to_string() }))
192 }
193 }
194}
195
196pub async fn get_sessions_handler(State(state): State<AppState>) -> Json<serde_json::Value> {
197 let (tx, rx) = oneshot::channel();
198 if let Err(e) = state.api_tx.send(ManagerCommand::GetSessions(tx)).await {
199 tracing::error!("Failed to send GetSessions request: {}", e);
200 return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
201 }
202
203 match rx.await {
204 Ok(Ok(sessions)) => Json(serde_json::json!({ "status": "ok", "sessions": sessions })),
205 Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
206 Err(e) => {
207 tracing::error!("Failed to receive GetSessions response: {}", e);
208 Json(serde_json::json!({ "status": "error", "message": e.to_string() }))
209 }
210 }
211}
212
213pub async fn get_session_history_handler(
214 State(state): State<AppState>,
215 Path(id): Path<String>,
216) -> Json<serde_json::Value> {
217 let session_key = if !id.contains(':') {
220 format!("gui:{}", id) } else {
222 id
223 };
224
225 let (tx, rx) = oneshot::channel();
226 if let Err(e) = state
227 .api_tx
228 .send(ManagerCommand::GetSessionHistory(session_key.clone(), tx))
229 .await
230 {
231 tracing::error!("Failed to send GetSessionHistory request: {}", e);
232 return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
233 }
234
235 match rx.await {
236 Ok(Ok(Some(session))) => Json(serde_json::json!({ "status": "ok", "session": session })),
237 Ok(Ok(None)) => {
238 Json(serde_json::json!({ "status": "error", "message": "Session not found" }))
239 }
240 Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
241 Err(e) => {
242 tracing::error!("Failed to receive GetSessionHistory response: {}", e);
243 Json(serde_json::json!({ "status": "error", "message": e.to_string() }))
244 }
245 }
246}
247
248pub async fn delete_session_handler(
249 State(state): State<AppState>,
250 Path(id): Path<String>,
251) -> Json<serde_json::Value> {
252 do_delete_session(state, id).await
253}
254
255async fn do_delete_session(state: AppState, id: String) -> Json<serde_json::Value> {
256 let session_key = if !id.contains(':') {
257 format!("gui:{}", id)
258 } else {
259 id
260 };
261
262 let (tx, rx) = oneshot::channel();
263 if let Err(e) = state
264 .api_tx
265 .send(ManagerCommand::DeleteSession(session_key.clone(), tx))
266 .await
267 {
268 tracing::error!("Failed to send DeleteSession request: {}", e);
269 return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
270 }
271
272 match rx.await {
273 Ok(Ok(deleted)) => {
274 tracing::info!(
275 session_key = %session_key,
276 deleted,
277 "DeleteSession completed"
278 );
279 Json(serde_json::json!({ "status": "ok", "deleted": deleted }))
280 }
281 Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
282 Err(e) => {
283 tracing::error!("Failed to receive DeleteSession response: {}", e);
284 Json(serde_json::json!({ "status": "error", "message": e.to_string() }))
285 }
286 }
287}
288
289pub async fn events_handler(
290 State(state): State<AppState>,
291 Query(query): Query<EventsQuery>,
292) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
293 let event_rx = state.bus.subscribe_events();
294 let channel_filter = query.channel;
295 let chat_id_filter = query.chat_id;
296 let chat_prefix_filter = query.chat_prefix;
297
298 let stream = BroadcastStream::new(event_rx).filter_map(move |evt| {
299 let channel_filter = channel_filter.clone();
300 let chat_id_filter = chat_id_filter.clone();
301 let chat_prefix_filter = chat_prefix_filter.clone();
302 async move {
303 let Ok(bus_event) = evt else {
304 return None;
305 };
306
307 if let Some(ch) = &channel_filter {
308 if bus_event.channel != *ch {
309 return None;
310 }
311 }
312 if let Some(chat_id) = &chat_id_filter {
313 if bus_event.chat_id != *chat_id {
314 return None;
315 }
316 }
317 if let Some(prefix) = &chat_prefix_filter {
318 if !bus_event.chat_id.starts_with(prefix) {
319 return None;
320 }
321 }
322
323 match bus_event.event {
324 AgentEvent::FinalResponse { content } => {
325 let data = serde_json::json!({
326 "channel": bus_event.channel,
327 "chat_id": bus_event.chat_id,
328 "content": content
329 });
330 Some(Ok(Event::default().event("final").data(data.to_string())))
331 }
332 AgentEvent::Error { message } => {
333 let data = serde_json::json!({
334 "channel": bus_event.channel,
335 "chat_id": bus_event.chat_id,
336 "message": message
337 });
338 Some(Ok(Event::default().event("error").data(data.to_string())))
339 }
340 _ => None,
341 }
342 }
343 });
344
345 Sse::new(stream).keep_alive(axum::response::sse::KeepAlive::default())
346}
347
348pub async fn get_config_handler(State(state): State<AppState>) -> Json<ConfigResponse> {
349 let (tx, rx) = oneshot::channel();
350 if let Err(e) = state.api_tx.send(ManagerCommand::GetConfig(tx)).await {
351 tracing::error!("Failed to send GetConfig request: {}", e);
352 return Json(ConfigResponse {
353 provider: Some("deepseek".to_string()),
354 api_base: None,
355 model: "deepseek-chat".to_string(),
356 has_api_key: false,
357 });
358 }
359
360 match rx.await {
361 Ok(resp) => Json(resp),
362 Err(e) => {
363 tracing::error!("Failed to receive GetConfig response: {}", e);
364 Json(ConfigResponse {
365 provider: Some("deepseek".to_string()),
366 api_base: None,
367 model: "deepseek-chat".to_string(),
368 has_api_key: false,
369 })
370 }
371 }
372}
373
374pub async fn update_config_handler(
375 State(state): State<AppState>,
376 Json(payload): Json<ConfigUpdate>,
377) -> Json<serde_json::Value> {
378 tracing::info!("Received update config request: {:?}", payload);
379 if let Err(e) = state
380 .api_tx
381 .send(ManagerCommand::UpdateConfig(payload))
382 .await
383 {
384 tracing::error!("Failed to send UpdateConfig request: {}", e);
385 return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
386 }
387
388 Json(serde_json::json!({ "status": "ok" }))
389}
390
391pub async fn get_channels_handler(State(state): State<AppState>) -> Json<ChannelsConfig> {
392 let (tx, rx) = oneshot::channel();
393 if let Err(e) = state.api_tx.send(ManagerCommand::GetChannels(tx)).await {
394 tracing::error!("Failed to send GetChannels request: {}", e);
395 return Json(ChannelsConfig::default());
396 }
397 match rx.await {
398 Ok(config) => Json(config),
399 Err(e) => {
400 tracing::error!("Failed to receive GetChannels response: {}", e);
401 Json(ChannelsConfig::default())
402 }
403 }
404}
405
406pub async fn get_tools_handler(State(state): State<AppState>) -> Json<ToolsConfigResponse> {
407 let (tx, rx) = oneshot::channel();
408 if let Err(e) = state.api_tx.send(ManagerCommand::GetTools(tx)).await {
409 tracing::error!("Failed to send GetTools request: {}", e);
410 return Json(ToolsConfigResponse {
411 web: agent_diva_core::config::schema::WebToolsConfig::default().into(),
412 });
413 }
414 match rx.await {
415 Ok(config) => Json(config),
416 Err(e) => {
417 tracing::error!("Failed to receive GetTools response: {}", e);
418 Json(ToolsConfigResponse {
419 web: agent_diva_core::config::schema::WebToolsConfig::default().into(),
420 })
421 }
422 }
423}
424
425pub async fn get_skills_handler(State(state): State<AppState>) -> Json<serde_json::Value> {
426 let (tx, rx) = oneshot::channel();
427 if let Err(e) = state.api_tx.send(ManagerCommand::GetSkills(tx)).await {
428 tracing::error!("Failed to send GetSkills request: {}", e);
429 return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
430 }
431 match rx.await {
432 Ok(Ok(skills)) => Json(serde_json::json!({ "status": "ok", "skills": skills })),
433 Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
434 Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
435 }
436}
437
438pub async fn get_mcps_handler(State(state): State<AppState>) -> Json<serde_json::Value> {
439 let (tx, rx) = oneshot::channel();
440 if let Err(e) = state.api_tx.send(ManagerCommand::GetMcps(tx)).await {
441 tracing::error!("Failed to send GetMcps request: {}", e);
442 return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
443 }
444 match rx.await {
445 Ok(Ok(mcps)) => Json(serde_json::json!({ "status": "ok", "mcps": mcps })),
446 Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
447 Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
448 }
449}
450
451pub async fn create_mcp_handler(
452 State(state): State<AppState>,
453 Json(payload): Json<crate::mcp_service::McpServerUpsert>,
454) -> Json<serde_json::Value> {
455 let (tx, rx) = oneshot::channel();
456 if let Err(e) = state
457 .api_tx
458 .send(ManagerCommand::CreateMcp(payload, tx))
459 .await
460 {
461 tracing::error!("Failed to send CreateMcp request: {}", e);
462 return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
463 }
464 match rx.await {
465 Ok(Ok(mcp)) => Json(serde_json::json!({ "status": "ok", "mcp": mcp })),
466 Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
467 Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
468 }
469}
470
471pub async fn update_mcp_handler(
472 State(state): State<AppState>,
473 Path(name): Path<String>,
474 Json(payload): Json<crate::mcp_service::McpServerUpsert>,
475) -> Json<serde_json::Value> {
476 let (tx, rx) = oneshot::channel();
477 if let Err(e) = state
478 .api_tx
479 .send(ManagerCommand::UpdateMcp(name, payload, tx))
480 .await
481 {
482 tracing::error!("Failed to send UpdateMcp request: {}", e);
483 return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
484 }
485 match rx.await {
486 Ok(Ok(mcp)) => Json(serde_json::json!({ "status": "ok", "mcp": mcp })),
487 Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
488 Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
489 }
490}
491
492pub async fn delete_mcp_handler(
493 State(state): State<AppState>,
494 Path(name): Path<String>,
495) -> Json<serde_json::Value> {
496 let (tx, rx) = oneshot::channel();
497 if let Err(e) = state.api_tx.send(ManagerCommand::DeleteMcp(name, tx)).await {
498 tracing::error!("Failed to send DeleteMcp request: {}", e);
499 return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
500 }
501 match rx.await {
502 Ok(Ok(())) => Json(serde_json::json!({ "status": "ok" })),
503 Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
504 Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
505 }
506}
507
508pub async fn set_mcp_enabled_handler(
509 State(state): State<AppState>,
510 Path(name): Path<String>,
511 Json(payload): Json<SetMcpEnabledRequest>,
512) -> Json<serde_json::Value> {
513 let (tx, rx) = oneshot::channel();
514 if let Err(e) = state
515 .api_tx
516 .send(ManagerCommand::SetMcpEnabled(name, payload.enabled, tx))
517 .await
518 {
519 tracing::error!("Failed to send SetMcpEnabled request: {}", e);
520 return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
521 }
522 match rx.await {
523 Ok(Ok(mcp)) => Json(serde_json::json!({ "status": "ok", "mcp": mcp })),
524 Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
525 Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
526 }
527}
528
529pub async fn refresh_mcp_status_handler(
530 State(state): State<AppState>,
531 Path(name): Path<String>,
532 Json(_payload): Json<McpRefreshRequest>,
533) -> Json<serde_json::Value> {
534 let (tx, rx) = oneshot::channel();
535 if let Err(e) = state
536 .api_tx
537 .send(ManagerCommand::RefreshMcpStatus(name, tx))
538 .await
539 {
540 tracing::error!("Failed to send RefreshMcpStatus request: {}", e);
541 return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
542 }
543 match rx.await {
544 Ok(Ok(mcp)) => Json(serde_json::json!({ "status": "ok", "mcp": mcp })),
545 Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
546 Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
547 }
548}
549
550pub async fn upload_skill_handler(
551 State(state): State<AppState>,
552 mut multipart: Multipart,
553) -> Json<serde_json::Value> {
554 let mut file_name: Option<String> = None;
555 let mut bytes: Option<Vec<u8>> = None;
556
557 loop {
558 let field = match multipart.next_field().await {
559 Ok(Some(field)) => field,
560 Ok(None) => break,
561 Err(e) => {
562 return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
563 }
564 };
565 if field.name() != Some("file") {
566 continue;
567 }
568 file_name = field.file_name().map(ToString::to_string);
569 match field.bytes().await {
570 Ok(body) => bytes = Some(body.to_vec()),
571 Err(e) => {
572 return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
573 }
574 }
575 }
576
577 let Some(file_name) = file_name else {
578 return Json(serde_json::json!({ "status": "error", "message": "missing file upload" }));
579 };
580 let Some(bytes) = bytes else {
581 return Json(serde_json::json!({ "status": "error", "message": "missing file body" }));
582 };
583
584 let (tx, rx) = oneshot::channel();
585 let request = SkillUploadRequest { file_name, bytes };
586 if let Err(e) = state
587 .api_tx
588 .send(ManagerCommand::UploadSkill(request, tx))
589 .await
590 {
591 tracing::error!("Failed to send UploadSkill request: {}", e);
592 return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
593 }
594 match rx.await {
595 Ok(Ok(skill)) => Json(serde_json::json!({ "status": "ok", "skill": skill })),
596 Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
597 Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
598 }
599}
600
601pub async fn upload_file_handler(
602 State(state): State<AppState>,
603 mut multipart: Multipart,
604) -> Json<serde_json::Value> {
605 let mut file_name: Option<String> = None;
606 let mut bytes: Option<Vec<u8>> = None;
607 let mut channel: Option<String> = None;
608 let mut message_id: Option<String> = None;
609
610 loop {
611 let field = match multipart.next_field().await {
612 Ok(Some(field)) => field,
613 Ok(None) => break,
614 Err(e) => {
615 return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
616 }
617 };
618 let name = field.name().map(ToString::to_string);
619 match name.as_deref() {
620 Some("file") => {
621 file_name = field.file_name().map(ToString::to_string);
622 match field.bytes().await {
623 Ok(body) => bytes = Some(body.to_vec()),
624 Err(e) => {
625 return Json(
626 serde_json::json!({ "status": "error", "message": e.to_string() }),
627 );
628 }
629 }
630 }
631 Some("channel") => match field.text().await {
632 Ok(text) => channel = Some(text),
633 Err(e) => {
634 return Json(
635 serde_json::json!({ "status": "error", "message": e.to_string() }),
636 );
637 }
638 },
639 Some("message_id") => {
640 if let Ok(text) = field.text().await {
641 message_id = Some(text);
642 }
643 }
644 _ => {}
645 }
646 }
647
648 let Some(file_name) = file_name else {
649 return Json(serde_json::json!({ "status": "error", "message": "missing file upload" }));
650 };
651 let Some(bytes) = bytes else {
652 return Json(serde_json::json!({ "status": "error", "message": "missing file body" }));
653 };
654 let Some(channel) = channel else {
655 return Json(serde_json::json!({ "status": "error", "message": "missing channel" }));
656 };
657
658 let (tx, rx) = oneshot::channel();
659 let request = FileUploadRequest {
660 file_name,
661 bytes,
662 channel,
663 message_id: message_id.clone(),
664 };
665 if let Err(e) = state
666 .api_tx
667 .send(ManagerCommand::UploadFile(request, tx))
668 .await
669 {
670 tracing::error!("Failed to send UploadFile request: {}", e);
671 return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
672 }
673 match rx.await {
674 Ok(Ok(attachment)) => Json(serde_json::json!({ "status": "ok", "attachment": attachment })),
675 Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
676 Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
677 }
678}
679
680pub async fn delete_skill_handler(
681 State(state): State<AppState>,
682 Path(name): Path<String>,
683) -> Json<serde_json::Value> {
684 let (tx, rx) = oneshot::channel();
685 if let Err(e) = state
686 .api_tx
687 .send(ManagerCommand::DeleteSkill(name, tx))
688 .await
689 {
690 tracing::error!("Failed to send DeleteSkill request: {}", e);
691 return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
692 }
693 match rx.await {
694 Ok(Ok(())) => Json(serde_json::json!({ "status": "ok" })),
695 Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
696 Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
697 }
698}
699
700pub async fn update_tools_handler(
701 State(state): State<AppState>,
702 Json(payload): Json<ToolsConfigUpdate>,
703) -> Json<serde_json::Value> {
704 tracing::info!("Received update tools request");
705 if let Err(e) = state
706 .api_tx
707 .send(ManagerCommand::UpdateTools(payload))
708 .await
709 {
710 tracing::error!("Failed to send UpdateTools request: {}", e);
711 return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
712 }
713 Json(serde_json::json!({ "status": "ok" }))
714}
715
716pub async fn update_channel_handler(
717 State(state): State<AppState>,
718 Json(payload): Json<ChannelUpdate>,
719) -> Json<serde_json::Value> {
720 tracing::info!("Received update channel request: {}", payload.name);
721 if let Err(e) = state
722 .api_tx
723 .send(ManagerCommand::UpdateChannel(payload))
724 .await
725 {
726 tracing::error!("Failed to send UpdateChannel request: {}", e);
727 return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
728 }
729
730 Json(serde_json::json!({ "status": "ok" }))
731}
732
733pub async fn heartbeat_handler() -> &'static str {
734 "ok"
735}
736
737pub async fn list_cron_jobs_handler(State(state): State<AppState>) -> Json<serde_json::Value> {
738 let (tx, rx) = oneshot::channel();
739 if let Err(e) = state.api_tx.send(ManagerCommand::ListCronJobs(tx)).await {
740 return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
741 }
742 match rx.await {
743 Ok(Ok(jobs)) => Json(serde_json::json!({ "status": "ok", "jobs": jobs })),
744 Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
745 Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
746 }
747}
748
749pub async fn get_cron_job_handler(
750 State(state): State<AppState>,
751 Path(id): Path<String>,
752) -> Json<serde_json::Value> {
753 let (tx, rx) = oneshot::channel();
754 if let Err(e) = state.api_tx.send(ManagerCommand::GetCronJob(id, tx)).await {
755 return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
756 }
757 match rx.await {
758 Ok(Ok(Some(job))) => Json(serde_json::json!({ "status": "ok", "job": job })),
759 Ok(Ok(None)) => Json(serde_json::json!({ "status": "error", "message": "Job not found" })),
760 Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
761 Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
762 }
763}
764
765pub async fn create_cron_job_handler(
766 State(state): State<AppState>,
767 Json(payload): Json<agent_diva_core::cron::CreateCronJobRequest>,
768) -> Json<serde_json::Value> {
769 let (tx, rx) = oneshot::channel();
770 if let Err(e) = state
771 .api_tx
772 .send(ManagerCommand::CreateCronJob(payload, tx))
773 .await
774 {
775 return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
776 }
777 match rx.await {
778 Ok(Ok(job)) => Json(serde_json::json!({ "status": "ok", "job": job })),
779 Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
780 Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
781 }
782}
783
784pub async fn update_cron_job_handler(
785 State(state): State<AppState>,
786 Path(id): Path<String>,
787 Json(payload): Json<agent_diva_core::cron::UpdateCronJobRequest>,
788) -> Json<serde_json::Value> {
789 let (tx, rx) = oneshot::channel();
790 if let Err(e) = state
791 .api_tx
792 .send(ManagerCommand::UpdateCronJob(id, payload, tx))
793 .await
794 {
795 return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
796 }
797 match rx.await {
798 Ok(Ok(job)) => Json(serde_json::json!({ "status": "ok", "job": job })),
799 Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
800 Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
801 }
802}
803
804pub async fn set_cron_job_enabled_handler(
805 State(state): State<AppState>,
806 Path(id): Path<String>,
807 Json(payload): Json<SetCronJobEnabledRequest>,
808) -> Json<serde_json::Value> {
809 let (tx, rx) = oneshot::channel();
810 if let Err(e) = state
811 .api_tx
812 .send(ManagerCommand::SetCronJobEnabled(id, payload.enabled, tx))
813 .await
814 {
815 return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
816 }
817 match rx.await {
818 Ok(Ok(job)) => Json(serde_json::json!({ "status": "ok", "job": job })),
819 Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
820 Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
821 }
822}
823
824pub async fn run_cron_job_handler(
825 State(state): State<AppState>,
826 Path(id): Path<String>,
827 Json(payload): Json<RunCronJobRequest>,
828) -> Json<serde_json::Value> {
829 let (tx, rx) = oneshot::channel();
830 if let Err(e) = state
831 .api_tx
832 .send(ManagerCommand::RunCronJobNow(id, payload.force, tx))
833 .await
834 {
835 return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
836 }
837 match rx.await {
838 Ok(Ok(job)) => Json(serde_json::json!({ "status": "ok", "job": job })),
839 Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
840 Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
841 }
842}
843
844pub async fn stop_cron_job_handler(
845 State(state): State<AppState>,
846 Path(id): Path<String>,
847) -> Json<serde_json::Value> {
848 let (tx, rx) = oneshot::channel();
849 if let Err(e) = state
850 .api_tx
851 .send(ManagerCommand::StopCronJobRun(id, tx))
852 .await
853 {
854 return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
855 }
856 match rx.await {
857 Ok(Ok(run)) => Json(serde_json::json!({ "status": "ok", "run": run })),
858 Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
859 Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
860 }
861}
862
863pub async fn delete_cron_job_handler(
864 State(state): State<AppState>,
865 Path(id): Path<String>,
866) -> Json<serde_json::Value> {
867 let (tx, rx) = oneshot::channel();
868 if let Err(e) = state
869 .api_tx
870 .send(ManagerCommand::DeleteCronJob(id, tx))
871 .await
872 {
873 return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
874 }
875 match rx.await {
876 Ok(Ok(())) => Json(serde_json::json!({ "status": "ok" })),
877 Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
878 Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
879 }
880}