use arcbox_grpc::v1::migration_service_server;
use arcbox_protocol::v1::{
PrepareMigrationRequest, PrepareMigrationResponse, RunMigrationEvent, RunMigrationRequest,
};
use std::pin::Pin;
use tokio_stream::Stream;
use tokio_stream::StreamExt as _;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic::{Request, Response, Status};
use crate::error::ApiError;
use crate::grpc::SharedRuntime;
trait SharedRuntimeReady {
#[allow(clippy::result_large_err)]
fn ready(&self) -> Result<&std::sync::Arc<arcbox_core::Runtime>, Status>;
}
impl SharedRuntimeReady for SharedRuntime {
#[allow(clippy::result_large_err)]
fn ready(&self) -> Result<&std::sync::Arc<arcbox_core::Runtime>, Status> {
self.get()
.ok_or_else(|| Status::unavailable("daemon is starting, runtime not ready yet"))
}
}
pub struct MigrationServiceImpl {
runtime: SharedRuntime,
}
impl MigrationServiceImpl {
#[must_use]
pub fn new(runtime: SharedRuntime) -> Self {
Self { runtime }
}
}
#[tonic::async_trait]
impl migration_service_server::MigrationService for MigrationServiceImpl {
async fn prepare_migration(
&self,
request: Request<PrepareMigrationRequest>,
) -> Result<Response<PrepareMigrationResponse>, Status> {
let response = self
.runtime
.ready()?
.migration_manager()
.prepare_migration(request.into_inner())
.await
.map_err(ApiError::from)?;
Ok(Response::new(response))
}
type RunMigrationStream =
Pin<Box<dyn Stream<Item = Result<RunMigrationEvent, Status>> + Send + 'static>>;
async fn run_migration(
&self,
request: Request<RunMigrationRequest>,
) -> Result<Response<Self::RunMigrationStream>, Status> {
let receiver = self
.runtime
.ready()?
.migration_manager()
.run_migration(request.into_inner())
.await
.map_err(ApiError::from)?;
#[allow(clippy::result_large_err)]
fn stream_status(
result: arcbox_core::Result<RunMigrationEvent>,
) -> Result<RunMigrationEvent, Status> {
result.map_err(|e| Status::internal(e.to_string()))
}
let stream = UnboundedReceiverStream::new(receiver).map(stream_status);
Ok(Response::new(Box::pin(stream)))
}
}