Skip to main content

solti_api/
grpc.rs

1//! # gRPC transport.
2//!
3//! [`SoltiApiService`] implements the generated `SoltiApi` trait from `proto/solti/v1/api.proto`,
4//! delegating to an [`ApiHandler`](crate::ApiHandler).
5
6use std::sync::Arc;
7
8use tonic::{Request, Response, Status};
9use tracing::debug;
10
11use solti_model::TaskQuery;
12
13use crate::convert::{proto_to_domain_status, tasks_page_to_proto};
14use crate::error::ApiError;
15use crate::handler::ApiHandler;
16use crate::proto_api::{self, solti_api_server::SoltiApi, solti_api_server::SoltiApiServer};
17use crate::validate::{clamp_list_limit, non_empty_id};
18
19/// gRPC service wrapping an [`ApiHandler`](crate::ApiHandler).
20///
21/// ## Also
22///
23/// - `SoltiApiServer` generated tonic server wrapper.
24/// - [`ApiError`](crate::ApiError) mapped to `tonic::Status`.
25pub struct SoltiApiService<H> {
26    handler: Arc<H>,
27}
28
29impl<H> SoltiApiService<H>
30where
31    H: ApiHandler,
32{
33    /// Create a new gRPC service with the given handler.
34    pub fn new(handler: Arc<H>) -> Self {
35        Self { handler }
36    }
37}
38
39/// Build a configured `SoltiApiServer` ready to mount on a tonic server.
40///
41/// ## Example
42///
43/// ```rust,no_run
44/// # use std::sync::Arc;
45/// # use solti_api::{build_grpc_server, SupervisorApiAdapter};
46/// # async fn example(adapter: Arc<SupervisorApiAdapter>) -> Result<(), Box<dyn std::error::Error>> {
47/// let svc = build_grpc_server(adapter);
48/// tonic::transport::Server::builder()
49///     .add_service(svc)
50///     .serve("0.0.0.0:50052".parse()?)
51///     .await?;
52/// # Ok(()) }
53/// ```
54pub fn build_grpc_server<H>(handler: Arc<H>) -> SoltiApiServer<SoltiApiService<H>>
55where
56    H: ApiHandler,
57{
58    SoltiApiServer::new(SoltiApiService::new(handler))
59        .max_decoding_message_size(crate::MAX_REQUEST_BYTES)
60        .max_encoding_message_size(crate::MAX_REQUEST_BYTES)
61}
62
63#[tonic::async_trait]
64impl<H> SoltiApi for SoltiApiService<H>
65where
66    H: ApiHandler,
67{
68    async fn submit_task(
69        &self,
70        request: Request<proto_api::SubmitTaskRequest>,
71    ) -> Result<Response<proto_api::SubmitTaskResponse>, Status> {
72        let req = request.into_inner();
73
74        let spec = req
75            .spec
76            .ok_or_else(|| Status::invalid_argument("missing spec"))?;
77
78        let spec =
79            crate::convert::convert_create_spec(spec).map_err(|e: ApiError| Status::from(e))?;
80
81        debug!(slot = %spec.slot(), kind = ?spec.kind(), "grpc: submitting task");
82        let task_id = self.handler.submit_task(spec).await.map_err(Status::from)?;
83
84        Ok(Response::new(proto_api::SubmitTaskResponse {
85            task_id: task_id.to_string(),
86        }))
87    }
88
89    async fn get_task_status(
90        &self,
91        request: Request<proto_api::GetTaskStatusRequest>,
92    ) -> Result<Response<proto_api::GetTaskStatusResponse>, Status> {
93        let req = request.into_inner();
94
95        non_empty_id("task_id", &req.task_id).map_err(Status::from)?;
96
97        let task_id = solti_model::TaskId::from(req.task_id);
98        debug!(%task_id, "grpc: getting task status");
99
100        let info = self
101            .handler
102            .get_task_status(&task_id)
103            .await
104            .map_err(Status::from)?;
105
106        let task = info
107            .map(proto_api::TaskData::try_from)
108            .transpose()
109            .map_err(Status::from)?;
110
111        Ok(Response::new(proto_api::GetTaskStatusResponse { task }))
112    }
113
114    async fn list_tasks(
115        &self,
116        request: Request<proto_api::ListTasksRequest>,
117    ) -> Result<Response<proto_api::ListTasksResponse>, Status> {
118        let req = request.into_inner();
119
120        let mut query = TaskQuery::new();
121
122        if let Some(slot) = req.slot {
123            non_empty_id("slot", &slot).map_err(Status::from)?;
124            query = query.with_slot(slot);
125        }
126
127        if let Some(status_raw) = req.status {
128            let status = proto_to_domain_status(status_raw).map_err(Status::from)?;
129            query = query.with_status(status);
130        }
131
132        query = query.with_limit(clamp_list_limit(req.limit));
133        if req.offset > 0 {
134            query = query.with_offset(req.offset as usize);
135        }
136
137        let page = self
138            .handler
139            .query_tasks(query)
140            .await
141            .map_err(Status::from)?;
142
143        debug!(
144            count = page.items.len(),
145            total = page.total,
146            "grpc: tasks listed"
147        );
148
149        let response = tasks_page_to_proto(page).map_err(Status::from)?;
150        Ok(Response::new(response))
151    }
152
153    async fn list_task_runs(
154        &self,
155        request: Request<proto_api::ListTaskRunsRequest>,
156    ) -> Result<Response<proto_api::ListTaskRunsResponse>, Status> {
157        let req = request.into_inner();
158
159        non_empty_id("task_id", &req.task_id).map_err(Status::from)?;
160
161        let task_id = solti_model::TaskId::from(req.task_id);
162        debug!(%task_id, "grpc: listing task runs");
163
164        let runs = self
165            .handler
166            .list_task_runs(&task_id)
167            .await
168            .map_err(Status::from)?;
169
170        let runs = runs.into_iter().map(proto_api::TaskRunInfo::from).collect();
171
172        Ok(Response::new(proto_api::ListTaskRunsResponse { runs }))
173    }
174
175    async fn delete_task(
176        &self,
177        request: Request<proto_api::DeleteTaskRequest>,
178    ) -> Result<Response<proto_api::DeleteTaskResponse>, Status> {
179        let req = request.into_inner();
180
181        non_empty_id("task_id", &req.task_id).map_err(Status::from)?;
182
183        let task_id = solti_model::TaskId::from(req.task_id);
184        debug!(%task_id, "grpc: deleting task");
185
186        self.handler
187            .delete_task(&task_id)
188            .await
189            .map_err(Status::from)?;
190
191        debug!(%task_id, "grpc: task deleted");
192        Ok(Response::new(proto_api::DeleteTaskResponse {}))
193    }
194}