1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#![deny(missing_docs)]
use crate::connect::MakeConnection;
use crate::error::KafkaError;
use crate::transport::{KafkaTransportService, MakeClient, TransportClient};
use bytes::BytesMut;
use futures::future::Future;
use kafka_protocol::messages::{RequestHeader, ResponseHeader};
use kafka_protocol::protocol::{Decodable, Encodable, HeaderVersion, Message, Request};
use std::pin::Pin;
use std::task::{Context, Poll};
use tower::Service;
pub mod connect;
pub mod error;
pub mod transport;
pub struct KafkaService<Svc> {
inner: Svc,
}
impl<Svc> KafkaService<Svc> {
pub fn new(inner: Svc) -> Self {
Self { inner }
}
fn encode<Req>(req: KafkaRequest<Req>) -> Result<BytesMut, KafkaError>
where
Req: Message + HeaderVersion + Encodable,
{
let version = req.0.request_api_version;
let mut bytes = BytesMut::new();
req.0
.encode(&mut bytes, <Req as HeaderVersion>::header_version(version))?;
req.1.encode(&mut bytes, version)?;
Ok(bytes)
}
fn decode<Res>(mut bytes: BytesMut, version: i16) -> Result<KafkaResponse<Res>, KafkaError>
where
Res: Message + HeaderVersion + Decodable,
{
let header =
ResponseHeader::decode(&mut bytes, <Res as HeaderVersion>::header_version(version))?;
let response = <Res as Decodable>::decode(&mut bytes, version)?;
Ok((header, response))
}
}
pub struct MakeService<C> {
connection: C,
}
impl<C> MakeService<C>
where
C: MakeConnection + 'static,
{
pub fn new(connection: C) -> Self {
Self { connection }
}
pub async fn into_service(
self,
) -> Result<KafkaService<KafkaTransportService<TransportClient<C::Connection>>>, C::Error> {
let client = MakeClient::with_connection(self.connection)
.into_client()
.await?;
let transport = KafkaTransportService::new(client);
Ok(KafkaService::new(transport))
}
}
pub type KafkaRequest<Req> = (RequestHeader, Req);
pub type KafkaResponse<Res> = (ResponseHeader, Res);
impl<Req, Svc> Service<KafkaRequest<Req>> for KafkaService<Svc>
where
Req: Request + Message + Encodable + HeaderVersion,
Svc: Service<BytesMut, Response = BytesMut>,
<Svc as Service<BytesMut>>::Error: Into<KafkaError>,
<Svc as Service<BytesMut>>::Future: 'static,
{
type Response = KafkaResponse<Req::Response>;
type Error = KafkaError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx).map_err(|e| e.into())
}
fn call(&mut self, req: KafkaRequest<Req>) -> Self::Future {
let version = req.0.request_api_version;
let encoded = Self::encode(req).unwrap();
let fut = self.inner.call(encoded);
Box::pin(async move {
let res_bytes = fut.await.map_err(|e| e.into())?;
let response = Self::decode(res_bytes, version)?;
Ok(response)
})
}
}