protosocket_rpc/server/
rpc_responder.rs1use std::future::Future;
2
3use crate::{
4 server::{
5 abortable::IdentifiableAbortable, abortion_tracker::AbortionTracker,
6 forward_streaming::ForwardAbortableStreamingRpc, forward_unary::ForwardAbortableUnaryRpc,
7 rpc_submitter::RpcResponse,
8 },
9 Message,
10};
11
12#[must_use]
15pub struct RpcResponder<'a, Response> {
16 outbound: &'a spillway::Sender<RpcResponse<Response>>,
17 aborts: &'a mut AbortionTracker,
18 message_id: u64,
19}
20impl<'a, Response> RpcResponder<'a, Response>
21where
22 Response: Message,
23{
24 pub(crate) fn new_responder_reference(
25 outbound: &'a spillway::Sender<RpcResponse<Response>>,
26 aborts: &'a mut AbortionTracker,
27 message_id: u64,
28 ) -> Self {
29 Self {
30 outbound,
31 aborts,
32 message_id,
33 }
34 }
35
36 pub fn unary(self, unary_rpc: impl Future<Output = Response>) -> impl Future<Output = ()> {
38 let (abortable, abort) = IdentifiableAbortable::new(unary_rpc);
39 self.aborts.register(self.message_id, abort);
40 ForwardAbortableUnaryRpc::new(abortable, self.message_id, self.outbound.clone())
41 }
42
43 pub fn stream(
45 self,
46 streaming_rpc: impl futures::Stream<Item = Response>,
47 ) -> impl Future<Output = ()> {
48 let (abortable_stream, abort) = IdentifiableAbortable::new(streaming_rpc);
49 self.aborts.register(self.message_id, abort);
50 ForwardAbortableStreamingRpc::new(abortable_stream, self.message_id, self.outbound.clone())
51 }
52
53 pub fn immediate(self, response: Response) {
58 if self
59 .outbound
60 .send(RpcResponse::Untracked(response))
61 .is_err()
62 {
63 log::debug!("outbound channel closed while sending response");
64 }
65 }
66}