Skip to main content

solti_api/
adapter.rs

1//! # Supervisor adapter.
2//!
3//! [`SupervisorApiAdapter`] bridges [`SupervisorApi`](solti_core::SupervisorApi) to [`ApiHandler`].
4
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use solti_core::SupervisorApi;
9use solti_model::{OutputEvent, Task, TaskId, TaskPage, TaskQuery, TaskRun, TaskSpec};
10use tokio_stream::StreamExt;
11use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
12
13use crate::error::ApiError;
14use crate::handler::{ApiHandler, OutputEventStream};
15
16/// Adapter that bridges [`SupervisorApi`] to [`ApiHandler`].
17///
18/// Ready-to-use implementation that directly delegates to `SupervisorApi`.
19///
20/// ## Also
21///
22/// - [`ApiHandler`] the trait this adapter implements.
23/// - [`ApiError::Core`] wraps `CoreError` from the supervisor.
24pub struct SupervisorApiAdapter {
25    supervisor: Arc<SupervisorApi>,
26}
27
28impl SupervisorApiAdapter {
29    /// Create a new adapter wrapping the given supervisor.
30    pub fn new(supervisor: Arc<SupervisorApi>) -> Self {
31        Self { supervisor }
32    }
33}
34
35#[async_trait]
36impl ApiHandler for SupervisorApiAdapter {
37    async fn submit_task(&self, spec: TaskSpec) -> Result<TaskId, ApiError> {
38        self.supervisor.submit(&spec).await.map_err(ApiError::from)
39    }
40
41    async fn get_task_status(&self, id: &TaskId) -> Result<Option<Task>, ApiError> {
42        Ok(self.supervisor.get_task(id))
43    }
44
45    async fn query_tasks(&self, query: TaskQuery) -> Result<TaskPage<Task>, ApiError> {
46        Ok(self.supervisor.query_tasks(&query))
47    }
48
49    async fn list_task_runs(&self, id: &TaskId) -> Result<Vec<TaskRun>, ApiError> {
50        Ok(self.supervisor.list_task_runs(id))
51    }
52
53    async fn delete_task(&self, id: &TaskId) -> Result<(), ApiError> {
54        self.supervisor
55            .delete_task(id)
56            .await
57            .map_err(ApiError::from)
58    }
59
60    async fn stream_task_logs(&self, id: &TaskId) -> Result<OutputEventStream, ApiError> {
61        let receiver = self
62            .supervisor
63            .output_registry()
64            .subscribe(id)
65            .ok_or_else(|| ApiError::TaskNotFound(id.to_string()))?;
66
67        let stream = BroadcastStream::new(receiver).map(|res| {
68            res.unwrap_or_else(
69                |BroadcastStreamRecvError::Lagged(skipped)| OutputEvent::Lagged { skipped },
70            )
71        });
72        Ok(Box::pin(stream))
73    }
74}