rama_tcp/client/service/
select.rs

1use rama_core::error::BoxError;
2use rama_core::Context;
3use std::fmt;
4use std::{convert::Infallible, future::Future, sync::Arc};
5
6use crate::client::TcpStreamConnector;
7
8/// Contains a `Connector` created by a [`TcpStreamConnectorFactory`],
9/// together with the [`Context`] used to create it in relation to.
10pub struct CreatedTcpStreamConnector<State, Connector> {
11    pub ctx: Context<State>,
12    pub connector: Connector,
13}
14
15impl<State, Connector> fmt::Debug for CreatedTcpStreamConnector<State, Connector>
16where
17    State: fmt::Debug,
18    Connector: fmt::Debug,
19{
20    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
21        f.debug_struct("CreatedTcpStreamConnector")
22            .field("ctx", &self.ctx)
23            .field("connector", &self.connector)
24            .finish()
25    }
26}
27
28impl<State, Connector> Clone for CreatedTcpStreamConnector<State, Connector>
29where
30    State: Clone,
31    Connector: Clone,
32{
33    fn clone(&self) -> Self {
34        Self {
35            ctx: self.ctx.clone(),
36            connector: self.connector.clone(),
37        }
38    }
39}
40
41/// Factory to create a [`TcpStreamConnector`]. This is used by the TCP
42/// stream service to create a stream within a specific [`Context`].
43///
44/// In the most simplest case you use a [`TcpStreamConnectorCloneFactory`]
45/// to use a [`Clone`]able [`TcpStreamConnectorCloneFactory`], but in more
46/// advanced cases you can use variants of [`TcpStreamConnector`] specific
47/// to the given contexts.
48///
49/// Examples why you might variants:
50///
51/// - you might have specific needs for your sockets (e.g. bind to a specific interface)
52///   that you do not have for all your egress traffic. A crate such as [`socket2`]
53///   can help you with this;
54/// - it is possible that you have specific filter or firewall needs for some of your
55///   egress traffic but not all of it.
56///
57/// [`socket`]: https://docs.rs/socket2
58pub trait TcpStreamConnectorFactory<State>: Send + Sync + 'static {
59    /// `TcpStreamConnector` created by this [`TcpStreamConnectorFactory`]
60    type Connector: TcpStreamConnector;
61    /// Error returned in case [`TcpStreamConnectorFactory`] was
62    /// not able to create a [`TcpStreamConnector`].
63    type Error;
64
65    /// Try to create a [`TcpStreamConnector`], and return an error or otherwise.
66    fn make_connector(
67        &self,
68        ctx: Context<State>,
69    ) -> impl Future<Output = Result<CreatedTcpStreamConnector<State, Self::Connector>, Self::Error>>
70           + Send
71           + '_;
72}
73
74impl<State: Send + Sync + 'static> TcpStreamConnectorFactory<State> for () {
75    type Connector = ();
76    type Error = Infallible;
77
78    fn make_connector(
79        &self,
80        ctx: Context<State>,
81    ) -> impl Future<Output = Result<CreatedTcpStreamConnector<State, Self::Connector>, Self::Error>>
82           + Send
83           + '_ {
84        std::future::ready(Ok(CreatedTcpStreamConnector { ctx, connector: () }))
85    }
86}
87
88/// Utility implementation of a [`TcpStreamConnectorFactory`] which is implemented
89/// to allow one to use a [`Clone`]able [`TcpStreamConnector`] as a [`TcpStreamConnectorFactory`]
90/// by cloning itself.
91///
92/// This struct cannot be created by third party crates
93/// and instead is to be used via other API's provided by this crate.
94pub struct TcpStreamConnectorCloneFactory<C>(pub(super) C);
95
96impl<C> fmt::Debug for TcpStreamConnectorCloneFactory<C>
97where
98    C: fmt::Debug,
99{
100    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
101        f.debug_tuple("TcpStreamConnectorCloneFactory")
102            .field(&self.0)
103            .finish()
104    }
105}
106
107impl<C> Clone for TcpStreamConnectorCloneFactory<C>
108where
109    C: Clone,
110{
111    fn clone(&self) -> Self {
112        Self(self.0.clone())
113    }
114}
115
116impl<State, C> TcpStreamConnectorFactory<State> for TcpStreamConnectorCloneFactory<C>
117where
118    C: TcpStreamConnector + Clone,
119    State: Send + Sync + 'static,
120{
121    type Connector = C;
122    type Error = Infallible;
123
124    fn make_connector(
125        &self,
126        ctx: Context<State>,
127    ) -> impl Future<Output = Result<CreatedTcpStreamConnector<State, Self::Connector>, Self::Error>>
128           + Send
129           + '_ {
130        std::future::ready(Ok(CreatedTcpStreamConnector {
131            ctx,
132            connector: self.0.clone(),
133        }))
134    }
135}
136
137impl<State, F> TcpStreamConnectorFactory<State> for Arc<F>
138where
139    F: TcpStreamConnectorFactory<State>,
140    State: Send + Sync + 'static,
141{
142    type Connector = F::Connector;
143    type Error = F::Error;
144
145    fn make_connector(
146        &self,
147        ctx: Context<State>,
148    ) -> impl Future<Output = Result<CreatedTcpStreamConnector<State, Self::Connector>, Self::Error>>
149           + Send
150           + '_ {
151        (**self).make_connector(ctx)
152    }
153}
154
155macro_rules! impl_stream_connector_factory_either {
156    ($id:ident, $($param:ident),+ $(,)?) => {
157        impl<State, $($param),+> TcpStreamConnectorFactory<State> for ::rama_core::combinators::$id<$($param),+>
158        where
159            State: Send + Sync + 'static,
160            $(
161                $param: TcpStreamConnectorFactory<State, Connector: TcpStreamConnector<Error: Into<BoxError>>, Error: Into<BoxError>>,
162            )+
163        {
164            type Connector = ::rama_core::combinators::$id<$($param::Connector),+>;
165            type Error = BoxError;
166
167            async fn make_connector(
168                &self,
169                ctx: Context<State>,
170            ) -> Result<CreatedTcpStreamConnector<State, Self::Connector>, Self::Error> {
171                match self {
172                    $(
173                        ::rama_core::combinators::$id::$param(s) => match s.make_connector(ctx).await {
174                            Err(e) => Err(e.into()),
175                            Ok(CreatedTcpStreamConnector{ ctx, connector }) => Ok(CreatedTcpStreamConnector{
176                                ctx,
177                                connector: ::rama_core::combinators::$id::$param(connector),
178                            }),
179                        },
180                    )+
181                }
182            }
183        }
184    };
185}
186
187::rama_core::combinators::impl_either!(impl_stream_connector_factory_either);