1mod conn;
2mod grpc;
3
4use conn::{ConnFactory, ConnManager};
5use flare_pb::{
6 flare_control_client::FlareControlClient, flare_kv_client::FlareKvClient,
7};
8use grpc::GrpcClientManager;
9use std::sync::Arc;
10use tonic::transport::Channel;
11
12use crate::{error::FlareInternalError, NodeId};
13
14pub type ControlPool =
15 ConnManager<NodeId, GrpcClientManager<FlareControlClient<Channel>>>;
16pub type DataPool =
17 ConnManager<NodeId, GrpcClientManager<FlareKvClient<Channel>>>;
18
19pub fn create_control_pool(resolver: Arc<dyn AddrResolver>) -> ControlPool {
20 let factory = Arc::new(GrpcConnFactory { resolver });
21 return ConnManager::new(factory);
22}
23
24pub fn create_data_pool(resolver: Arc<dyn AddrResolver>) -> DataPool {
25 let factory = Arc::new(GrpcConnFactory { resolver });
26 return ConnManager::new(factory);
27}
28
29#[async_trait::async_trait]
30pub trait AddrResolver: Send + Sync {
31 async fn resolve(&self, node_id: NodeId) -> Option<String>;
32}
33
34struct GrpcConnFactory {
35 resolver: Arc<dyn AddrResolver>,
36}
37
38#[async_trait::async_trait]
39impl ConnFactory<NodeId, GrpcClientManager<FlareKvClient<Channel>>>
40 for GrpcConnFactory
41{
42 async fn create(
43 &self,
44 node_id: NodeId,
45 ) -> Result<GrpcClientManager<FlareKvClient<Channel>>, FlareInternalError>
46 {
47 if let Some(addr) = self.resolver.resolve(node_id).await {
48 GrpcClientManager::new(&addr, FlareKvClient::new)
49 } else {
50 Err(FlareInternalError::NoSuchNode(node_id))
51 }
52 }
53}
54
55#[async_trait::async_trait]
56impl ConnFactory<NodeId, GrpcClientManager<FlareControlClient<Channel>>>
57 for GrpcConnFactory
58{
59 async fn create(
60 &self,
61 node_id: NodeId,
62 ) -> Result<
63 GrpcClientManager<FlareControlClient<Channel>>,
64 FlareInternalError,
65 > {
66 if let Some(addr) = self.resolver.resolve(node_id).await {
67 GrpcClientManager::new(&addr, FlareControlClient::new)
68 } else {
69 Err(FlareInternalError::NoSuchNode(node_id))
70 }
71 }
72}