1use std::sync::Arc;
6
7use async_trait::async_trait;
8use solti_core::SupervisorApi;
9use solti_model::{OutputEvent, Task, TaskId, TaskPage, TaskQuery, TaskRun, TaskSpec};
10use tokio_stream::StreamExt;
11use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
12
13use crate::error::ApiError;
14use crate::handler::{ApiHandler, OutputEventStream};
15
16pub struct SupervisorApiAdapter {
25 supervisor: Arc<SupervisorApi>,
26}
27
28impl SupervisorApiAdapter {
29 pub fn new(supervisor: Arc<SupervisorApi>) -> Self {
31 Self { supervisor }
32 }
33}
34
35#[async_trait]
36impl ApiHandler for SupervisorApiAdapter {
37 async fn submit_task(&self, spec: TaskSpec) -> Result<TaskId, ApiError> {
38 self.supervisor.submit(&spec).await.map_err(ApiError::from)
39 }
40
41 async fn get_task_status(&self, id: &TaskId) -> Result<Option<Task>, ApiError> {
42 Ok(self.supervisor.get_task(id))
43 }
44
45 async fn query_tasks(&self, query: TaskQuery) -> Result<TaskPage<Task>, ApiError> {
46 Ok(self.supervisor.query_tasks(&query))
47 }
48
49 async fn list_task_runs(&self, id: &TaskId) -> Result<Vec<TaskRun>, ApiError> {
50 Ok(self.supervisor.list_task_runs(id))
51 }
52
53 async fn delete_task(&self, id: &TaskId) -> Result<(), ApiError> {
54 self.supervisor
55 .delete_task(id)
56 .await
57 .map_err(ApiError::from)
58 }
59
60 async fn stream_task_logs(&self, id: &TaskId) -> Result<OutputEventStream, ApiError> {
61 let receiver = self
62 .supervisor
63 .output_registry()
64 .subscribe(id)
65 .ok_or_else(|| ApiError::TaskNotFound(id.to_string()))?;
66
67 let stream = BroadcastStream::new(receiver).map(|res| {
68 res.unwrap_or_else(
69 |BroadcastStreamRecvError::Lagged(skipped)| OutputEvent::Lagged { skipped },
70 )
71 });
72 Ok(Box::pin(stream))
73 }
74}