flare_cli/
lib.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
use flare_dht::cli::{
    CollectionOperation, FlareCli, FlareCommands, ServerArgs,
};
use flare_dht::metadata::raft::FlareMetadataManager;
use flare_dht::metadata::MetadataManager;
use flare_dht::proto::flare_control_server::FlareControlServer;
use flare_dht::proto::flare_kv_client::FlareKvClient;
use flare_dht::proto::flare_kv_server::FlareKvServer;
use flare_dht::proto::CreateCollectionRequest;
use flare_dht::rpc_server::control_api::FlareControlService;
use flare_dht::rpc_server::kv_api::FlareKvService;
use flare_dht::shard::{HashMapShard, HashMapShardFactory, ShardManager};
use flare_dht::FlareNode;
use std::error::Error;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
use tonic::transport::Server;
use tonic::Request;
use tracing::info;

pub async fn start_server(
    options: ServerArgs,
) -> Result<Arc<FlareNode<HashMapShard>>, Box<dyn Error>> {
    info!("use option {options:?}");

    let node_id = options.get_node_id();
    info!("use node_id: {node_id}");

    let z_session = zenoh::open(zenoh::Config::default()).await.unwrap();
    let prefix = format!("flare/{}/nodes", options.cluster_id);

    let metadata_manager: Arc<FlareMetadataManager> = Arc::new(
        FlareMetadataManager::new(
            node_id,
            options.get_addr(),
            options.clone(),
            z_session.clone(),
            &prefix,
        )
        .await,
    );
    metadata_manager.initialize().await?;
    let shard_manager =
        Arc::new(ShardManager::new(Box::new(HashMapShardFactory {})));
    let flare_node = FlareNode::new(
        options.get_addr(),
        node_id,
        metadata_manager.clone(),
        shard_manager,
        // metadata_manager.control_pool.clone(),
        // metadata_manager.data_pool.clone(),
    )
    .await;

    let shared_node = Arc::new(flare_node);
    let flare_node = shared_node.clone();
    flare_node.start_watch_stream();
    let flare_kv = FlareKvService::new(shared_node.clone());
    let flare_control = FlareControlService::new(metadata_manager.clone());

    // let socket: SocketAddr = options.addr.parse()?;
    if !options.not_server {
        let socket = SocketAddr::new(
            IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
            options.port,
        );
        info!("start on {}", socket);
        let reflection_server_v1a =
            tonic_reflection::server::Builder::configure()
                .register_encoded_file_descriptor_set(
                    flare_dht::proto::FILE_DESCRIPTOR_SET,
                )
                .build_v1alpha()
                .unwrap();

        let reflection_server_v1 =
            tonic_reflection::server::Builder::configure()
                .register_encoded_file_descriptor_set(
                    flare_dht::proto::FILE_DESCRIPTOR_SET,
                )
                .build_v1()
                .unwrap();

        if let Some(addr) = options.peer_addr {
            let flare_node = shared_node.clone();
            tokio::spawn(async move {
                let node = flare_node.clone();
                node.join(&addr).await.unwrap()
            });
        };

        tokio::spawn(async move {
            Server::builder()
                .add_service(reflection_server_v1a)
                .add_service(reflection_server_v1)
                .add_service(FlareKvServer::new(flare_kv))
                .add_service(FlareControlServer::new(flare_control))
                .serve(socket)
                .await
                .unwrap();
        });
    }

    Ok(shared_node)
}

pub async fn handle_cli(command: FlareCli) -> Result<(), Box<dyn Error>> {
    match command.command {
        FlareCommands::Server(server_args) => handle_server(server_args).await,
        FlareCommands::Collection { opt } => handle_collection(opt).await,
    }
}

async fn handle_server(server_args: ServerArgs) -> Result<(), Box<dyn Error>> {
    let flare_node = start_server(server_args).await?;

    match tokio::signal::ctrl_c().await {
        Ok(()) => {}
        Err(err) => {
            eprintln!("Unable to listen for shutdown signal: {}", err);
            // we also shut down in case of error
        }
    }
    info!("starting a clean up for shutdown");
    flare_node.leave().await;
    info!("done clean up");
    Ok(())
}

async fn handle_collection(
    opt: CollectionOperation,
) -> Result<(), Box<dyn Error>> {
    info!("collection {:?}", opt);
    match opt {
        CollectionOperation::Create {
            name,
            shard_count: partitions,
            connection,
        } => {
            let mut client =
                FlareKvClient::connect(connection.server_url).await?;
            let resp = client
                .create_collection(Request::new(CreateCollectionRequest {
                    partition_count: partitions as i32,
                    name: name,
                    ..Default::default()
                }))
                .await?;
            info!("RESP: {:?}\n", resp);
        }
    }
    Ok(())
}