cala_ledger/outbox/server/
mod.rs1#![allow(clippy::blocks_in_conditions)]
2mod config;
3mod convert;
4pub mod error;
5
6#[allow(clippy::all)]
7pub mod proto {
8 tonic::include_proto!("services.outbox.v1");
9}
10
11use futures::StreamExt;
12use proto::{outbox_service_server::OutboxService, *};
13use tonic::{transport::Server, Request, Response, Status};
14use tracing::instrument;
15
16use super::ObixOutbox;
17pub use config::*;
18use error::*;
19
20pub struct OutboxServer {
21 outbox: ObixOutbox,
22}
23
24#[tonic::async_trait]
25impl OutboxService for OutboxServer {
26 type SubscribeStream = std::pin::Pin<
27 Box<dyn futures::Stream<Item = Result<CalaLedgerEvent, Status>> + Send + Sync + 'static>,
28 >;
29
30 #[instrument(name = "cala_ledger.subscribe", skip_all, fields(error, error.level, error.message))]
31 async fn subscribe(
32 &self,
33 request: Request<SubscribeRequest>,
34 ) -> Result<Response<Self::SubscribeStream>, Status> {
35 cala_tracing::grpc::extract_tracing(&request);
36
37 let SubscribeRequest { after_sequence } = request.into_inner();
38
39 let listener = self
40 .outbox
41 .listen_persisted(after_sequence.map(obix::EventSequence::from));
42
43 Ok(Response::new(Box::pin(
44 listener
45 .map(|event| Ok(proto::CalaLedgerEvent::from((*event).clone())))
46 .fuse(),
47 )))
48 }
49}
50
51#[instrument(name = "cala_ledger.outbox_server.start", skip(outbox))]
52pub(crate) async fn start(
53 server_config: OutboxServerConfig,
54 outbox: ObixOutbox,
55) -> Result<(), OutboxServerError> {
56 let outbox_service = OutboxServer { outbox };
57 tracing::info!(
58 "Outbox server started on port {}",
59 server_config.listen_port
60 );
61 Server::builder()
62 .add_service(outbox_service_server::OutboxServiceServer::new(
63 outbox_service,
64 ))
65 .serve(([0, 0, 0, 0], server_config.listen_port).into())
66 .await?;
67 tracing::info!(
68 "Outbox server stopped on port {}",
69 server_config.listen_port
70 );
71 Ok(())
72}