Skip to main content

agent_diva_manager/
handlers.rs

1mod provider_companion;
2
3pub use provider_companion::{
4    add_provider_model_handler, create_provider_handler, delete_provider_handler,
5    delete_provider_model_handler, get_provider_handler, get_provider_models_handler,
6    get_providers_handler, resolve_provider_handler, update_provider_handler,
7};
8
9use agent_diva_agent::AgentEvent;
10use agent_diva_core::bus::InboundMessage;
11use agent_diva_core::config::schema::ChannelsConfig;
12use axum::{
13    extract::{Multipart, Path, Query, State},
14    response::sse::{Event, Sse},
15    Json,
16};
17use futures::stream::{Stream, StreamExt};
18use std::convert::Infallible;
19use tokio::sync::{mpsc, oneshot};
20use tokio_stream::wrappers::BroadcastStream;
21use tokio_stream::wrappers::UnboundedReceiverStream;
22
23use crate::state::{
24    ApiRequest, AppState, ChannelUpdate, ConfigResponse, ConfigUpdate, FileUploadRequest,
25    ManagerCommand, McpRefreshRequest, RunCronJobRequest, SetCronJobEnabledRequest,
26    SetMcpEnabledRequest, SkillUploadRequest, StopChatRequest, ToolsConfigResponse,
27    ToolsConfigUpdate,
28};
29
30#[derive(serde::Deserialize)]
31pub struct ChatRequest {
32    pub message: String,
33    pub channel: Option<String>,
34    pub chat_id: Option<String>,
35    pub attachments: Option<Vec<String>>,
36}
37
38#[derive(serde::Deserialize, Default)]
39pub struct EventsQuery {
40    pub channel: Option<String>,
41    pub chat_id: Option<String>,
42    pub chat_prefix: Option<String>,
43}
44
45pub async fn chat_handler(
46    State(state): State<AppState>,
47    Json(payload): Json<ChatRequest>,
48) -> Sse<futures::stream::BoxStream<'static, Result<Event, Infallible>>> {
49    let channel = payload.channel.unwrap_or("api".to_string());
50    let chat_id = payload.chat_id.unwrap_or("default".to_string());
51
52    if payload.message.trim() == "/stop" {
53        let (stop_tx, stop_rx) = oneshot::channel();
54        let stop_req = StopChatRequest {
55            channel: Some(channel),
56            chat_id: Some(chat_id),
57        };
58        let stop_send_result = state
59            .api_tx
60            .send(ManagerCommand::StopChat(stop_req, stop_tx))
61            .await;
62
63        let stop_message = match stop_send_result {
64            Ok(_) => match stop_rx.await {
65                Ok(Ok(_)) => "Generation stopped by user.".to_string(),
66                Ok(Err(e)) => format!("Failed to stop generation: {}", e),
67                Err(e) => format!("Failed to receive stop response: {}", e),
68            },
69            Err(e) => format!("Failed to send stop request: {}", e),
70        };
71
72        let stream =
73            futures::stream::once(
74                async move { Ok(Event::default().event("error").data(stop_message)) },
75            )
76            .boxed();
77        return Sse::new(stream).keep_alive(axum::response::sse::KeepAlive::default());
78    }
79
80    let (event_tx, event_rx) = mpsc::unbounded_channel();
81
82    let mut msg = InboundMessage::new(channel, "user", chat_id, payload.message);
83    if let Some(attachments) = payload.attachments {
84        for attachment in attachments {
85            msg = msg.with_media(attachment);
86        }
87    }
88
89    let req = ApiRequest { msg, event_tx };
90
91    if let Err(e) = state.api_tx.send(ManagerCommand::Chat(req)).await {
92        tracing::error!("Failed to send API request to manager: {}", e);
93    }
94
95    let stream = UnboundedReceiverStream::new(event_rx)
96        .map(|event| {
97            let evt = match event {
98                AgentEvent::AssistantDelta { text } => Event::default().event("delta").data(text),
99                AgentEvent::ReasoningDelta { text } => {
100                    Event::default().event("reasoning_delta").data(text)
101                }
102                AgentEvent::ToolCallDelta { name, args_delta } => {
103                    let data = serde_json::json!({
104                        "name": name,
105                        "delta": args_delta
106                    });
107                    Event::default().event("tool_delta").data(data.to_string())
108                }
109                AgentEvent::FinalResponse { content } => {
110                    Event::default().event("final").data(content)
111                }
112                AgentEvent::ToolCallStarted {
113                    name,
114                    args_preview,
115                    call_id,
116                } => {
117                    let data = serde_json::json!({
118                        "name": name,
119                        "args": args_preview,
120                        "id": call_id
121                    });
122                    Event::default().event("tool_start").data(data.to_string())
123                }
124                AgentEvent::ToolCallFinished {
125                    name,
126                    result,
127                    is_error,
128                    call_id,
129                } => {
130                    let data = serde_json::json!({
131                        "name": name,
132                        "result": result,
133                        "error": is_error,
134                        "id": call_id
135                    });
136                    Event::default().event("tool_finish").data(data.to_string())
137                }
138                AgentEvent::Error { message } => Event::default().event("error").data(message),
139                _ => Event::default().comment("keep-alive"),
140            };
141            Ok(evt)
142        })
143        .boxed();
144
145    Sse::new(stream).keep_alive(axum::response::sse::KeepAlive::default())
146}
147
148pub async fn stop_chat_handler(
149    State(state): State<AppState>,
150    Json(payload): Json<StopChatRequest>,
151) -> Json<serde_json::Value> {
152    let (tx, rx) = oneshot::channel();
153    if let Err(e) = state
154        .api_tx
155        .send(ManagerCommand::StopChat(payload, tx))
156        .await
157    {
158        tracing::error!("Failed to send StopChat request: {}", e);
159        return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
160    }
161
162    match rx.await {
163        Ok(Ok(stopped)) => Json(serde_json::json!({ "status": "ok", "stopped": stopped })),
164        Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
165        Err(e) => {
166            tracing::error!("Failed to receive StopChat response: {}", e);
167            Json(serde_json::json!({ "status": "error", "message": e.to_string() }))
168        }
169    }
170}
171
172pub async fn reset_session_handler(
173    State(state): State<AppState>,
174    Json(payload): Json<crate::state::ResetSessionRequest>,
175) -> Json<serde_json::Value> {
176    let (tx, rx) = oneshot::channel();
177    if let Err(e) = state
178        .api_tx
179        .send(ManagerCommand::ResetSession(payload, tx))
180        .await
181    {
182        tracing::error!("Failed to send ResetSession request: {}", e);
183        return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
184    }
185
186    match rx.await {
187        Ok(Ok(reset)) => Json(serde_json::json!({ "status": "ok", "reset": reset })),
188        Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
189        Err(e) => {
190            tracing::error!("Failed to receive ResetSession response: {}", e);
191            Json(serde_json::json!({ "status": "error", "message": e.to_string() }))
192        }
193    }
194}
195
196pub async fn get_sessions_handler(State(state): State<AppState>) -> Json<serde_json::Value> {
197    let (tx, rx) = oneshot::channel();
198    if let Err(e) = state.api_tx.send(ManagerCommand::GetSessions(tx)).await {
199        tracing::error!("Failed to send GetSessions request: {}", e);
200        return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
201    }
202
203    match rx.await {
204        Ok(Ok(sessions)) => Json(serde_json::json!({ "status": "ok", "sessions": sessions })),
205        Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
206        Err(e) => {
207            tracing::error!("Failed to receive GetSessions response: {}", e);
208            Json(serde_json::json!({ "status": "error", "message": e.to_string() }))
209        }
210    }
211}
212
213pub async fn get_session_history_handler(
214    State(state): State<AppState>,
215    Path(id): Path<String>,
216) -> Json<serde_json::Value> {
217    // If the path just gives an id (e.g. from frontend gui), then assume channel is implicit, normally the id comes as format `channel:chat_id` but frontend may just send `chat_id`. Wait, let the frontend send `channel:chat_id` via the path or query.
218    // To support fetching any session_key, we will decode the path parameter if it's url encoded, or just use it as is.
219    let session_key = if !id.contains(':') {
220        format!("gui:{}", id) // fallback for backwards compatibility or assumptions
221    } else {
222        id
223    };
224
225    let (tx, rx) = oneshot::channel();
226    if let Err(e) = state
227        .api_tx
228        .send(ManagerCommand::GetSessionHistory(session_key.clone(), tx))
229        .await
230    {
231        tracing::error!("Failed to send GetSessionHistory request: {}", e);
232        return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
233    }
234
235    match rx.await {
236        Ok(Ok(Some(session))) => Json(serde_json::json!({ "status": "ok", "session": session })),
237        Ok(Ok(None)) => {
238            Json(serde_json::json!({ "status": "error", "message": "Session not found" }))
239        }
240        Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
241        Err(e) => {
242            tracing::error!("Failed to receive GetSessionHistory response: {}", e);
243            Json(serde_json::json!({ "status": "error", "message": e.to_string() }))
244        }
245    }
246}
247
248pub async fn delete_session_handler(
249    State(state): State<AppState>,
250    Path(id): Path<String>,
251) -> Json<serde_json::Value> {
252    do_delete_session(state, id).await
253}
254
255async fn do_delete_session(state: AppState, id: String) -> Json<serde_json::Value> {
256    let session_key = if !id.contains(':') {
257        format!("gui:{}", id)
258    } else {
259        id
260    };
261
262    let (tx, rx) = oneshot::channel();
263    if let Err(e) = state
264        .api_tx
265        .send(ManagerCommand::DeleteSession(session_key.clone(), tx))
266        .await
267    {
268        tracing::error!("Failed to send DeleteSession request: {}", e);
269        return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
270    }
271
272    match rx.await {
273        Ok(Ok(deleted)) => {
274            tracing::info!(
275                session_key = %session_key,
276                deleted,
277                "DeleteSession completed"
278            );
279            Json(serde_json::json!({ "status": "ok", "deleted": deleted }))
280        }
281        Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
282        Err(e) => {
283            tracing::error!("Failed to receive DeleteSession response: {}", e);
284            Json(serde_json::json!({ "status": "error", "message": e.to_string() }))
285        }
286    }
287}
288
289pub async fn events_handler(
290    State(state): State<AppState>,
291    Query(query): Query<EventsQuery>,
292) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
293    let event_rx = state.bus.subscribe_events();
294    let channel_filter = query.channel;
295    let chat_id_filter = query.chat_id;
296    let chat_prefix_filter = query.chat_prefix;
297
298    let stream = BroadcastStream::new(event_rx).filter_map(move |evt| {
299        let channel_filter = channel_filter.clone();
300        let chat_id_filter = chat_id_filter.clone();
301        let chat_prefix_filter = chat_prefix_filter.clone();
302        async move {
303            let Ok(bus_event) = evt else {
304                return None;
305            };
306
307            if let Some(ch) = &channel_filter {
308                if bus_event.channel != *ch {
309                    return None;
310                }
311            }
312            if let Some(chat_id) = &chat_id_filter {
313                if bus_event.chat_id != *chat_id {
314                    return None;
315                }
316            }
317            if let Some(prefix) = &chat_prefix_filter {
318                if !bus_event.chat_id.starts_with(prefix) {
319                    return None;
320                }
321            }
322
323            match bus_event.event {
324                AgentEvent::FinalResponse { content } => {
325                    let data = serde_json::json!({
326                        "channel": bus_event.channel,
327                        "chat_id": bus_event.chat_id,
328                        "content": content
329                    });
330                    Some(Ok(Event::default().event("final").data(data.to_string())))
331                }
332                AgentEvent::Error { message } => {
333                    let data = serde_json::json!({
334                        "channel": bus_event.channel,
335                        "chat_id": bus_event.chat_id,
336                        "message": message
337                    });
338                    Some(Ok(Event::default().event("error").data(data.to_string())))
339                }
340                _ => None,
341            }
342        }
343    });
344
345    Sse::new(stream).keep_alive(axum::response::sse::KeepAlive::default())
346}
347
348pub async fn get_config_handler(State(state): State<AppState>) -> Json<ConfigResponse> {
349    let (tx, rx) = oneshot::channel();
350    if let Err(e) = state.api_tx.send(ManagerCommand::GetConfig(tx)).await {
351        tracing::error!("Failed to send GetConfig request: {}", e);
352        return Json(ConfigResponse {
353            provider: Some("deepseek".to_string()),
354            api_base: None,
355            model: "deepseek-chat".to_string(),
356            has_api_key: false,
357        });
358    }
359
360    match rx.await {
361        Ok(resp) => Json(resp),
362        Err(e) => {
363            tracing::error!("Failed to receive GetConfig response: {}", e);
364            Json(ConfigResponse {
365                provider: Some("deepseek".to_string()),
366                api_base: None,
367                model: "deepseek-chat".to_string(),
368                has_api_key: false,
369            })
370        }
371    }
372}
373
374pub async fn update_config_handler(
375    State(state): State<AppState>,
376    Json(payload): Json<ConfigUpdate>,
377) -> Json<serde_json::Value> {
378    tracing::info!("Received update config request: {:?}", payload);
379    if let Err(e) = state
380        .api_tx
381        .send(ManagerCommand::UpdateConfig(payload))
382        .await
383    {
384        tracing::error!("Failed to send UpdateConfig request: {}", e);
385        return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
386    }
387
388    Json(serde_json::json!({ "status": "ok" }))
389}
390
391pub async fn get_channels_handler(State(state): State<AppState>) -> Json<ChannelsConfig> {
392    let (tx, rx) = oneshot::channel();
393    if let Err(e) = state.api_tx.send(ManagerCommand::GetChannels(tx)).await {
394        tracing::error!("Failed to send GetChannels request: {}", e);
395        return Json(ChannelsConfig::default());
396    }
397    match rx.await {
398        Ok(config) => Json(config),
399        Err(e) => {
400            tracing::error!("Failed to receive GetChannels response: {}", e);
401            Json(ChannelsConfig::default())
402        }
403    }
404}
405
406pub async fn get_tools_handler(State(state): State<AppState>) -> Json<ToolsConfigResponse> {
407    let (tx, rx) = oneshot::channel();
408    if let Err(e) = state.api_tx.send(ManagerCommand::GetTools(tx)).await {
409        tracing::error!("Failed to send GetTools request: {}", e);
410        return Json(ToolsConfigResponse {
411            web: agent_diva_core::config::schema::WebToolsConfig::default().into(),
412        });
413    }
414    match rx.await {
415        Ok(config) => Json(config),
416        Err(e) => {
417            tracing::error!("Failed to receive GetTools response: {}", e);
418            Json(ToolsConfigResponse {
419                web: agent_diva_core::config::schema::WebToolsConfig::default().into(),
420            })
421        }
422    }
423}
424
425pub async fn get_skills_handler(State(state): State<AppState>) -> Json<serde_json::Value> {
426    let (tx, rx) = oneshot::channel();
427    if let Err(e) = state.api_tx.send(ManagerCommand::GetSkills(tx)).await {
428        tracing::error!("Failed to send GetSkills request: {}", e);
429        return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
430    }
431    match rx.await {
432        Ok(Ok(skills)) => Json(serde_json::json!({ "status": "ok", "skills": skills })),
433        Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
434        Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
435    }
436}
437
438pub async fn get_mcps_handler(State(state): State<AppState>) -> Json<serde_json::Value> {
439    let (tx, rx) = oneshot::channel();
440    if let Err(e) = state.api_tx.send(ManagerCommand::GetMcps(tx)).await {
441        tracing::error!("Failed to send GetMcps request: {}", e);
442        return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
443    }
444    match rx.await {
445        Ok(Ok(mcps)) => Json(serde_json::json!({ "status": "ok", "mcps": mcps })),
446        Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
447        Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
448    }
449}
450
451pub async fn create_mcp_handler(
452    State(state): State<AppState>,
453    Json(payload): Json<crate::mcp_service::McpServerUpsert>,
454) -> Json<serde_json::Value> {
455    let (tx, rx) = oneshot::channel();
456    if let Err(e) = state
457        .api_tx
458        .send(ManagerCommand::CreateMcp(payload, tx))
459        .await
460    {
461        tracing::error!("Failed to send CreateMcp request: {}", e);
462        return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
463    }
464    match rx.await {
465        Ok(Ok(mcp)) => Json(serde_json::json!({ "status": "ok", "mcp": mcp })),
466        Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
467        Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
468    }
469}
470
471pub async fn update_mcp_handler(
472    State(state): State<AppState>,
473    Path(name): Path<String>,
474    Json(payload): Json<crate::mcp_service::McpServerUpsert>,
475) -> Json<serde_json::Value> {
476    let (tx, rx) = oneshot::channel();
477    if let Err(e) = state
478        .api_tx
479        .send(ManagerCommand::UpdateMcp(name, payload, tx))
480        .await
481    {
482        tracing::error!("Failed to send UpdateMcp request: {}", e);
483        return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
484    }
485    match rx.await {
486        Ok(Ok(mcp)) => Json(serde_json::json!({ "status": "ok", "mcp": mcp })),
487        Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
488        Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
489    }
490}
491
492pub async fn delete_mcp_handler(
493    State(state): State<AppState>,
494    Path(name): Path<String>,
495) -> Json<serde_json::Value> {
496    let (tx, rx) = oneshot::channel();
497    if let Err(e) = state.api_tx.send(ManagerCommand::DeleteMcp(name, tx)).await {
498        tracing::error!("Failed to send DeleteMcp request: {}", e);
499        return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
500    }
501    match rx.await {
502        Ok(Ok(())) => Json(serde_json::json!({ "status": "ok" })),
503        Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
504        Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
505    }
506}
507
508pub async fn set_mcp_enabled_handler(
509    State(state): State<AppState>,
510    Path(name): Path<String>,
511    Json(payload): Json<SetMcpEnabledRequest>,
512) -> Json<serde_json::Value> {
513    let (tx, rx) = oneshot::channel();
514    if let Err(e) = state
515        .api_tx
516        .send(ManagerCommand::SetMcpEnabled(name, payload.enabled, tx))
517        .await
518    {
519        tracing::error!("Failed to send SetMcpEnabled request: {}", e);
520        return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
521    }
522    match rx.await {
523        Ok(Ok(mcp)) => Json(serde_json::json!({ "status": "ok", "mcp": mcp })),
524        Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
525        Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
526    }
527}
528
529pub async fn refresh_mcp_status_handler(
530    State(state): State<AppState>,
531    Path(name): Path<String>,
532    Json(_payload): Json<McpRefreshRequest>,
533) -> Json<serde_json::Value> {
534    let (tx, rx) = oneshot::channel();
535    if let Err(e) = state
536        .api_tx
537        .send(ManagerCommand::RefreshMcpStatus(name, tx))
538        .await
539    {
540        tracing::error!("Failed to send RefreshMcpStatus request: {}", e);
541        return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
542    }
543    match rx.await {
544        Ok(Ok(mcp)) => Json(serde_json::json!({ "status": "ok", "mcp": mcp })),
545        Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
546        Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
547    }
548}
549
550pub async fn upload_skill_handler(
551    State(state): State<AppState>,
552    mut multipart: Multipart,
553) -> Json<serde_json::Value> {
554    let mut file_name: Option<String> = None;
555    let mut bytes: Option<Vec<u8>> = None;
556
557    loop {
558        let field = match multipart.next_field().await {
559            Ok(Some(field)) => field,
560            Ok(None) => break,
561            Err(e) => {
562                return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
563            }
564        };
565        if field.name() != Some("file") {
566            continue;
567        }
568        file_name = field.file_name().map(ToString::to_string);
569        match field.bytes().await {
570            Ok(body) => bytes = Some(body.to_vec()),
571            Err(e) => {
572                return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
573            }
574        }
575    }
576
577    let Some(file_name) = file_name else {
578        return Json(serde_json::json!({ "status": "error", "message": "missing file upload" }));
579    };
580    let Some(bytes) = bytes else {
581        return Json(serde_json::json!({ "status": "error", "message": "missing file body" }));
582    };
583
584    let (tx, rx) = oneshot::channel();
585    let request = SkillUploadRequest { file_name, bytes };
586    if let Err(e) = state
587        .api_tx
588        .send(ManagerCommand::UploadSkill(request, tx))
589        .await
590    {
591        tracing::error!("Failed to send UploadSkill request: {}", e);
592        return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
593    }
594    match rx.await {
595        Ok(Ok(skill)) => Json(serde_json::json!({ "status": "ok", "skill": skill })),
596        Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
597        Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
598    }
599}
600
601pub async fn upload_file_handler(
602    State(state): State<AppState>,
603    mut multipart: Multipart,
604) -> Json<serde_json::Value> {
605    let mut file_name: Option<String> = None;
606    let mut bytes: Option<Vec<u8>> = None;
607    let mut channel: Option<String> = None;
608    let mut message_id: Option<String> = None;
609
610    loop {
611        let field = match multipart.next_field().await {
612            Ok(Some(field)) => field,
613            Ok(None) => break,
614            Err(e) => {
615                return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
616            }
617        };
618        let name = field.name().map(ToString::to_string);
619        match name.as_deref() {
620            Some("file") => {
621                file_name = field.file_name().map(ToString::to_string);
622                match field.bytes().await {
623                    Ok(body) => bytes = Some(body.to_vec()),
624                    Err(e) => {
625                        return Json(
626                            serde_json::json!({ "status": "error", "message": e.to_string() }),
627                        );
628                    }
629                }
630            }
631            Some("channel") => match field.text().await {
632                Ok(text) => channel = Some(text),
633                Err(e) => {
634                    return Json(
635                        serde_json::json!({ "status": "error", "message": e.to_string() }),
636                    );
637                }
638            },
639            Some("message_id") => {
640                if let Ok(text) = field.text().await {
641                    message_id = Some(text);
642                }
643            }
644            _ => {}
645        }
646    }
647
648    let Some(file_name) = file_name else {
649        return Json(serde_json::json!({ "status": "error", "message": "missing file upload" }));
650    };
651    let Some(bytes) = bytes else {
652        return Json(serde_json::json!({ "status": "error", "message": "missing file body" }));
653    };
654    let Some(channel) = channel else {
655        return Json(serde_json::json!({ "status": "error", "message": "missing channel" }));
656    };
657
658    let (tx, rx) = oneshot::channel();
659    let request = FileUploadRequest {
660        file_name,
661        bytes,
662        channel,
663        message_id: message_id.clone(),
664    };
665    if let Err(e) = state
666        .api_tx
667        .send(ManagerCommand::UploadFile(request, tx))
668        .await
669    {
670        tracing::error!("Failed to send UploadFile request: {}", e);
671        return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
672    }
673    match rx.await {
674        Ok(Ok(attachment)) => Json(serde_json::json!({ "status": "ok", "attachment": attachment })),
675        Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
676        Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
677    }
678}
679
680pub async fn delete_skill_handler(
681    State(state): State<AppState>,
682    Path(name): Path<String>,
683) -> Json<serde_json::Value> {
684    let (tx, rx) = oneshot::channel();
685    if let Err(e) = state
686        .api_tx
687        .send(ManagerCommand::DeleteSkill(name, tx))
688        .await
689    {
690        tracing::error!("Failed to send DeleteSkill request: {}", e);
691        return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
692    }
693    match rx.await {
694        Ok(Ok(())) => Json(serde_json::json!({ "status": "ok" })),
695        Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
696        Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
697    }
698}
699
700pub async fn update_tools_handler(
701    State(state): State<AppState>,
702    Json(payload): Json<ToolsConfigUpdate>,
703) -> Json<serde_json::Value> {
704    tracing::info!("Received update tools request");
705    if let Err(e) = state
706        .api_tx
707        .send(ManagerCommand::UpdateTools(payload))
708        .await
709    {
710        tracing::error!("Failed to send UpdateTools request: {}", e);
711        return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
712    }
713    Json(serde_json::json!({ "status": "ok" }))
714}
715
716pub async fn update_channel_handler(
717    State(state): State<AppState>,
718    Json(payload): Json<ChannelUpdate>,
719) -> Json<serde_json::Value> {
720    tracing::info!("Received update channel request: {}", payload.name);
721    if let Err(e) = state
722        .api_tx
723        .send(ManagerCommand::UpdateChannel(payload))
724        .await
725    {
726        tracing::error!("Failed to send UpdateChannel request: {}", e);
727        return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
728    }
729
730    Json(serde_json::json!({ "status": "ok" }))
731}
732
733pub async fn heartbeat_handler() -> &'static str {
734    "ok"
735}
736
737pub async fn list_cron_jobs_handler(State(state): State<AppState>) -> Json<serde_json::Value> {
738    let (tx, rx) = oneshot::channel();
739    if let Err(e) = state.api_tx.send(ManagerCommand::ListCronJobs(tx)).await {
740        return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
741    }
742    match rx.await {
743        Ok(Ok(jobs)) => Json(serde_json::json!({ "status": "ok", "jobs": jobs })),
744        Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
745        Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
746    }
747}
748
749pub async fn get_cron_job_handler(
750    State(state): State<AppState>,
751    Path(id): Path<String>,
752) -> Json<serde_json::Value> {
753    let (tx, rx) = oneshot::channel();
754    if let Err(e) = state.api_tx.send(ManagerCommand::GetCronJob(id, tx)).await {
755        return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
756    }
757    match rx.await {
758        Ok(Ok(Some(job))) => Json(serde_json::json!({ "status": "ok", "job": job })),
759        Ok(Ok(None)) => Json(serde_json::json!({ "status": "error", "message": "Job not found" })),
760        Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
761        Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
762    }
763}
764
765pub async fn create_cron_job_handler(
766    State(state): State<AppState>,
767    Json(payload): Json<agent_diva_core::cron::CreateCronJobRequest>,
768) -> Json<serde_json::Value> {
769    let (tx, rx) = oneshot::channel();
770    if let Err(e) = state
771        .api_tx
772        .send(ManagerCommand::CreateCronJob(payload, tx))
773        .await
774    {
775        return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
776    }
777    match rx.await {
778        Ok(Ok(job)) => Json(serde_json::json!({ "status": "ok", "job": job })),
779        Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
780        Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
781    }
782}
783
784pub async fn update_cron_job_handler(
785    State(state): State<AppState>,
786    Path(id): Path<String>,
787    Json(payload): Json<agent_diva_core::cron::UpdateCronJobRequest>,
788) -> Json<serde_json::Value> {
789    let (tx, rx) = oneshot::channel();
790    if let Err(e) = state
791        .api_tx
792        .send(ManagerCommand::UpdateCronJob(id, payload, tx))
793        .await
794    {
795        return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
796    }
797    match rx.await {
798        Ok(Ok(job)) => Json(serde_json::json!({ "status": "ok", "job": job })),
799        Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
800        Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
801    }
802}
803
804pub async fn set_cron_job_enabled_handler(
805    State(state): State<AppState>,
806    Path(id): Path<String>,
807    Json(payload): Json<SetCronJobEnabledRequest>,
808) -> Json<serde_json::Value> {
809    let (tx, rx) = oneshot::channel();
810    if let Err(e) = state
811        .api_tx
812        .send(ManagerCommand::SetCronJobEnabled(id, payload.enabled, tx))
813        .await
814    {
815        return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
816    }
817    match rx.await {
818        Ok(Ok(job)) => Json(serde_json::json!({ "status": "ok", "job": job })),
819        Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
820        Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
821    }
822}
823
824pub async fn run_cron_job_handler(
825    State(state): State<AppState>,
826    Path(id): Path<String>,
827    Json(payload): Json<RunCronJobRequest>,
828) -> Json<serde_json::Value> {
829    let (tx, rx) = oneshot::channel();
830    if let Err(e) = state
831        .api_tx
832        .send(ManagerCommand::RunCronJobNow(id, payload.force, tx))
833        .await
834    {
835        return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
836    }
837    match rx.await {
838        Ok(Ok(job)) => Json(serde_json::json!({ "status": "ok", "job": job })),
839        Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
840        Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
841    }
842}
843
844pub async fn stop_cron_job_handler(
845    State(state): State<AppState>,
846    Path(id): Path<String>,
847) -> Json<serde_json::Value> {
848    let (tx, rx) = oneshot::channel();
849    if let Err(e) = state
850        .api_tx
851        .send(ManagerCommand::StopCronJobRun(id, tx))
852        .await
853    {
854        return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
855    }
856    match rx.await {
857        Ok(Ok(run)) => Json(serde_json::json!({ "status": "ok", "run": run })),
858        Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
859        Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
860    }
861}
862
863pub async fn delete_cron_job_handler(
864    State(state): State<AppState>,
865    Path(id): Path<String>,
866) -> Json<serde_json::Value> {
867    let (tx, rx) = oneshot::channel();
868    if let Err(e) = state
869        .api_tx
870        .send(ManagerCommand::DeleteCronJob(id, tx))
871        .await
872    {
873        return Json(serde_json::json!({ "status": "error", "message": e.to_string() }));
874    }
875    match rx.await {
876        Ok(Ok(())) => Json(serde_json::json!({ "status": "ok" })),
877        Ok(Err(e)) => Json(serde_json::json!({ "status": "error", "message": e })),
878        Err(e) => Json(serde_json::json!({ "status": "error", "message": e.to_string() })),
879    }
880}