cala_ledger/outbox/server/
mod.rs

1#![allow(clippy::blocks_in_conditions)]
2mod config;
3mod convert;
4pub mod error;
5
6#[allow(clippy::all)]
7pub mod proto {
8    tonic::include_proto!("services.outbox.v1");
9}
10
11use futures::StreamExt;
12use proto::{outbox_service_server::OutboxService, *};
13use tonic::{transport::Server, Request, Response, Status};
14use tracing::instrument;
15
16use super::ObixOutbox;
17pub use config::*;
18use error::*;
19
20pub struct OutboxServer {
21    outbox: ObixOutbox,
22}
23
24#[tonic::async_trait]
25impl OutboxService for OutboxServer {
26    type SubscribeStream = std::pin::Pin<
27        Box<dyn futures::Stream<Item = Result<CalaLedgerEvent, Status>> + Send + Sync + 'static>,
28    >;
29
30    #[instrument(name = "cala_ledger.subscribe", skip_all, fields(error, error.level, error.message))]
31    async fn subscribe(
32        &self,
33        request: Request<SubscribeRequest>,
34    ) -> Result<Response<Self::SubscribeStream>, Status> {
35        cala_tracing::grpc::extract_tracing(&request);
36
37        let SubscribeRequest { after_sequence } = request.into_inner();
38
39        let listener = self
40            .outbox
41            .listen_persisted(after_sequence.map(obix::EventSequence::from));
42
43        Ok(Response::new(Box::pin(
44            listener
45                .map(|event| Ok(proto::CalaLedgerEvent::from((*event).clone())))
46                .fuse(),
47        )))
48    }
49}
50
51#[instrument(name = "cala_ledger.outbox_server.start", skip(outbox))]
52pub(crate) async fn start(
53    server_config: OutboxServerConfig,
54    outbox: ObixOutbox,
55) -> Result<(), OutboxServerError> {
56    let outbox_service = OutboxServer { outbox };
57    tracing::info!(
58        "Outbox server started on port {}",
59        server_config.listen_port
60    );
61    Server::builder()
62        .add_service(outbox_service_server::OutboxServiceServer::new(
63            outbox_service,
64        ))
65        .serve(([0, 0, 0, 0], server_config.listen_port).into())
66        .await?;
67    tracing::info!(
68        "Outbox server stopped on port {}",
69        server_config.listen_port
70    );
71    Ok(())
72}