cala_ledger/outbox/server/
mod.rs1#![allow(clippy::blocks_in_conditions)]
2mod config;
3mod convert;
4pub mod error;
5
6#[allow(clippy::all)]
7pub mod proto {
8 tonic::include_proto!("services.outbox.v1");
9}
10
11use futures::StreamExt;
12use proto::{outbox_service_server::OutboxService, *};
13use tonic::{transport::Server, Request, Response, Status};
14use tracing::instrument;
15
16use super::{EventSequence, Outbox};
17pub use config::*;
18use error::*;
19
20pub struct OutboxServer {
21 outbox: Outbox,
22}
23
24#[tonic::async_trait]
25impl OutboxService for OutboxServer {
26 type SubscribeStream = std::pin::Pin<
27 Box<dyn futures::Stream<Item = Result<CalaLedgerEvent, Status>> + Send + Sync + 'static>,
28 >;
29
30 #[instrument(name = "cala_ledger.subscribe", skip_all, fields(error, error.level, error.message), err)]
31 async fn subscribe(
32 &self,
33 request: Request<SubscribeRequest>,
34 ) -> Result<Response<Self::SubscribeStream>, Status> {
35 cala_tracing::grpc::extract_tracing(&request);
36
37 let SubscribeRequest { after_sequence } = request.into_inner();
38
39 let outbox_listener = self
40 .outbox
41 .register_listener(after_sequence.map(EventSequence::from))
42 .await
43 .map_err(|e| tonic::Status::internal(e.to_string()))?;
44 Ok(Response::new(Box::pin(
45 outbox_listener
46 .map(|event| Ok(proto::CalaLedgerEvent::from(event)))
47 .fuse(),
48 )))
49 }
50}
51
52pub(crate) async fn start(
53 server_config: OutboxServerConfig,
54 outbox: Outbox,
55) -> Result<(), OutboxServerError> {
56 let outbox_service = OutboxServer { outbox };
57 Server::builder()
58 .add_service(outbox_service_server::OutboxServiceServer::new(
59 outbox_service,
60 ))
61 .serve(([0, 0, 0, 0], server_config.listen_port).into())
62 .await?;
63 Ok(())
64}