flare_dht/pool/
mod.rs

1mod conn;
2mod grpc;
3
4use conn::{ConnFactory, ConnManager};
5use flare_pb::{
6    flare_control_client::FlareControlClient, flare_kv_client::FlareKvClient,
7};
8use grpc::GrpcClientManager;
9use std::sync::Arc;
10use tonic::transport::Channel;
11
12use crate::{error::FlareInternalError, NodeId};
13
14pub type ControlPool =
15    ConnManager<NodeId, GrpcClientManager<FlareControlClient<Channel>>>;
16pub type DataPool =
17    ConnManager<NodeId, GrpcClientManager<FlareKvClient<Channel>>>;
18
19pub fn create_control_pool(resolver: Arc<dyn AddrResolver>) -> ControlPool {
20    let factory = Arc::new(GrpcConnFactory { resolver });
21    return ConnManager::new(factory);
22}
23
24pub fn create_data_pool(resolver: Arc<dyn AddrResolver>) -> DataPool {
25    let factory = Arc::new(GrpcConnFactory { resolver });
26    return ConnManager::new(factory);
27}
28
29#[async_trait::async_trait]
30pub trait AddrResolver: Send + Sync {
31    async fn resolve(&self, node_id: NodeId) -> Option<String>;
32}
33
34struct GrpcConnFactory {
35    resolver: Arc<dyn AddrResolver>,
36}
37
38#[async_trait::async_trait]
39impl ConnFactory<NodeId, GrpcClientManager<FlareKvClient<Channel>>>
40    for GrpcConnFactory
41{
42    async fn create(
43        &self,
44        node_id: NodeId,
45    ) -> Result<GrpcClientManager<FlareKvClient<Channel>>, FlareInternalError>
46    {
47        if let Some(addr) = self.resolver.resolve(node_id).await {
48            GrpcClientManager::new(&addr, FlareKvClient::new)
49        } else {
50            Err(FlareInternalError::NoSuchNode(node_id))
51        }
52    }
53}
54
55#[async_trait::async_trait]
56impl ConnFactory<NodeId, GrpcClientManager<FlareControlClient<Channel>>>
57    for GrpcConnFactory
58{
59    async fn create(
60        &self,
61        node_id: NodeId,
62    ) -> Result<
63        GrpcClientManager<FlareControlClient<Channel>>,
64        FlareInternalError,
65    > {
66        if let Some(addr) = self.resolver.resolve(node_id).await {
67            GrpcClientManager::new(&addr, FlareControlClient::new)
68        } else {
69            Err(FlareInternalError::NoSuchNode(node_id))
70        }
71    }
72}