rama_tcp/client/service/
select.rs1use rama_core::error::BoxError;
2use rama_core::Context;
3use std::fmt;
4use std::{convert::Infallible, future::Future, sync::Arc};
5
6use crate::client::TcpStreamConnector;
7
8pub struct CreatedTcpStreamConnector<State, Connector> {
11 pub ctx: Context<State>,
12 pub connector: Connector,
13}
14
15impl<State, Connector> fmt::Debug for CreatedTcpStreamConnector<State, Connector>
16where
17 State: fmt::Debug,
18 Connector: fmt::Debug,
19{
20 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
21 f.debug_struct("CreatedTcpStreamConnector")
22 .field("ctx", &self.ctx)
23 .field("connector", &self.connector)
24 .finish()
25 }
26}
27
28impl<State, Connector> Clone for CreatedTcpStreamConnector<State, Connector>
29where
30 State: Clone,
31 Connector: Clone,
32{
33 fn clone(&self) -> Self {
34 Self {
35 ctx: self.ctx.clone(),
36 connector: self.connector.clone(),
37 }
38 }
39}
40
41pub trait TcpStreamConnectorFactory<State>: Send + Sync + 'static {
59 type Connector: TcpStreamConnector;
61 type Error;
64
65 fn make_connector(
67 &self,
68 ctx: Context<State>,
69 ) -> impl Future<Output = Result<CreatedTcpStreamConnector<State, Self::Connector>, Self::Error>>
70 + Send
71 + '_;
72}
73
74impl<State: Send + Sync + 'static> TcpStreamConnectorFactory<State> for () {
75 type Connector = ();
76 type Error = Infallible;
77
78 fn make_connector(
79 &self,
80 ctx: Context<State>,
81 ) -> impl Future<Output = Result<CreatedTcpStreamConnector<State, Self::Connector>, Self::Error>>
82 + Send
83 + '_ {
84 std::future::ready(Ok(CreatedTcpStreamConnector { ctx, connector: () }))
85 }
86}
87
88pub struct TcpStreamConnectorCloneFactory<C>(pub(super) C);
95
96impl<C> fmt::Debug for TcpStreamConnectorCloneFactory<C>
97where
98 C: fmt::Debug,
99{
100 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
101 f.debug_tuple("TcpStreamConnectorCloneFactory")
102 .field(&self.0)
103 .finish()
104 }
105}
106
107impl<C> Clone for TcpStreamConnectorCloneFactory<C>
108where
109 C: Clone,
110{
111 fn clone(&self) -> Self {
112 Self(self.0.clone())
113 }
114}
115
116impl<State, C> TcpStreamConnectorFactory<State> for TcpStreamConnectorCloneFactory<C>
117where
118 C: TcpStreamConnector + Clone,
119 State: Send + Sync + 'static,
120{
121 type Connector = C;
122 type Error = Infallible;
123
124 fn make_connector(
125 &self,
126 ctx: Context<State>,
127 ) -> impl Future<Output = Result<CreatedTcpStreamConnector<State, Self::Connector>, Self::Error>>
128 + Send
129 + '_ {
130 std::future::ready(Ok(CreatedTcpStreamConnector {
131 ctx,
132 connector: self.0.clone(),
133 }))
134 }
135}
136
137impl<State, F> TcpStreamConnectorFactory<State> for Arc<F>
138where
139 F: TcpStreamConnectorFactory<State>,
140 State: Send + Sync + 'static,
141{
142 type Connector = F::Connector;
143 type Error = F::Error;
144
145 fn make_connector(
146 &self,
147 ctx: Context<State>,
148 ) -> impl Future<Output = Result<CreatedTcpStreamConnector<State, Self::Connector>, Self::Error>>
149 + Send
150 + '_ {
151 (**self).make_connector(ctx)
152 }
153}
154
155macro_rules! impl_stream_connector_factory_either {
156 ($id:ident, $($param:ident),+ $(,)?) => {
157 impl<State, $($param),+> TcpStreamConnectorFactory<State> for ::rama_core::combinators::$id<$($param),+>
158 where
159 State: Send + Sync + 'static,
160 $(
161 $param: TcpStreamConnectorFactory<State, Connector: TcpStreamConnector<Error: Into<BoxError>>, Error: Into<BoxError>>,
162 )+
163 {
164 type Connector = ::rama_core::combinators::$id<$($param::Connector),+>;
165 type Error = BoxError;
166
167 async fn make_connector(
168 &self,
169 ctx: Context<State>,
170 ) -> Result<CreatedTcpStreamConnector<State, Self::Connector>, Self::Error> {
171 match self {
172 $(
173 ::rama_core::combinators::$id::$param(s) => match s.make_connector(ctx).await {
174 Err(e) => Err(e.into()),
175 Ok(CreatedTcpStreamConnector{ ctx, connector }) => Ok(CreatedTcpStreamConnector{
176 ctx,
177 connector: ::rama_core::combinators::$id::$param(connector),
178 }),
179 },
180 )+
181 }
182 }
183 }
184 };
185}
186
187::rama_core::combinators::impl_either!(impl_stream_connector_factory_either);