Skip to main content

protosocket_rpc/server/
rpc_responder.rs

1use std::future::Future;
2
3use crate::{
4    server::{
5        abortable::IdentifiableAbortable, abortion_tracker::AbortionTracker,
6        forward_streaming::ForwardAbortableStreamingRpc, forward_unary::ForwardAbortableUnaryRpc,
7        rpc_submitter::RpcResponse,
8    },
9    Message,
10};
11
12/// A request context's temporary lease to an RPC Reactor's state.
13/// You want to consume your RpcResponder as quickly as possible.
14#[must_use]
15pub struct RpcResponder<'a, Response> {
16    outbound: &'a spillway::Sender<RpcResponse<Response>>,
17    aborts: &'a mut AbortionTracker,
18    message_id: u64,
19}
20impl<'a, Response> RpcResponder<'a, Response>
21where
22    Response: Message,
23{
24    pub(crate) fn new_responder_reference(
25        outbound: &'a spillway::Sender<RpcResponse<Response>>,
26        aborts: &'a mut AbortionTracker,
27        message_id: u64,
28    ) -> Self {
29        Self {
30            outbound,
31            aborts,
32            message_id,
33        }
34    }
35
36    /// Consume the responder by providing a future that will materialize the response to this request.
37    pub fn unary(self, unary_rpc: impl Future<Output = Response>) -> impl Future<Output = ()> {
38        let (abortable, abort) = IdentifiableAbortable::new(unary_rpc);
39        self.aborts.register(self.message_id, abort);
40        ForwardAbortableUnaryRpc::new(abortable, self.message_id, self.outbound.clone())
41    }
42
43    /// Consume the responder by providing a stream that will materialize the response to this request.
44    pub fn stream(
45        self,
46        streaming_rpc: impl futures::Stream<Item = Response>,
47    ) -> impl Future<Output = ()> {
48        let (abortable_stream, abort) = IdentifiableAbortable::new(streaming_rpc);
49        self.aborts.register(self.message_id, abort);
50        ForwardAbortableStreamingRpc::new(abortable_stream, self.message_id, self.outbound.clone())
51    }
52
53    /// Consume the responder by providing an immediate response.
54    ///
55    /// This is the cheapest, fastest way to respond, but you must only use it when you can get a response
56    /// without blocking!
57    pub fn immediate(self, response: Response) {
58        if self
59            .outbound
60            .send(RpcResponse::Untracked(response))
61            .is_err()
62        {
63            log::debug!("outbound channel closed while sending response");
64        }
65    }
66}