flare_cli/
lib.rs

1use flare_dht::cli::{
2    CollectionOperation, FlareCli, FlareCommands, ServerArgs,
3};
4use flare_dht::metadata::raft::FlareMetadataManager;
5use flare_dht::metadata::MetadataManager;
6use flare_dht::proto::flare_control_server::FlareControlServer;
7use flare_dht::proto::flare_kv_client::FlareKvClient;
8use flare_dht::proto::flare_kv_server::FlareKvServer;
9use flare_dht::proto::CreateCollectionRequest;
10use flare_dht::rpc_server::control_api::FlareControlService;
11use flare_dht::rpc_server::kv_api::FlareKvService;
12use flare_dht::shard::{HashMapShard, HashMapShardFactory, ShardManager};
13use flare_dht::FlareNode;
14use std::error::Error;
15use std::net::{IpAddr, Ipv4Addr, SocketAddr};
16use std::sync::Arc;
17use tonic::transport::Server;
18use tonic::Request;
19use tracing::info;
20
21pub async fn start_server(
22    options: ServerArgs,
23) -> Result<Arc<FlareNode<HashMapShard>>, Box<dyn Error>> {
24    info!("use option {options:?}");
25
26    let node_id = options.get_node_id();
27    info!("use node_id: {node_id}");
28
29    let z_session = zenoh::open(zenoh::Config::default()).await.unwrap();
30    let prefix = format!("flare/{}/nodes", options.cluster_id);
31
32    let metadata_manager: Arc<FlareMetadataManager> = Arc::new(
33        FlareMetadataManager::new(
34            node_id,
35            options.get_addr(),
36            options.clone(),
37            z_session.clone(),
38            &prefix,
39        )
40        .await,
41    );
42    metadata_manager.initialize().await?;
43    let shard_manager =
44        Arc::new(ShardManager::new(Box::new(HashMapShardFactory {})));
45    let flare_node = FlareNode::new(
46        options.get_addr(),
47        node_id,
48        metadata_manager.clone(),
49        shard_manager,
50        // metadata_manager.control_pool.clone(),
51        // metadata_manager.data_pool.clone(),
52    )
53    .await;
54
55    let shared_node = Arc::new(flare_node);
56    let flare_node = shared_node.clone();
57    flare_node.start_watch_stream();
58    let flare_kv = FlareKvService::new(shared_node.clone());
59    let flare_control = FlareControlService::new(metadata_manager.clone());
60
61    // let socket: SocketAddr = options.addr.parse()?;
62    if !options.not_server {
63        let socket = SocketAddr::new(
64            IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
65            options.port,
66        );
67        info!("start on {}", socket);
68        let reflection_server_v1a =
69            tonic_reflection::server::Builder::configure()
70                .register_encoded_file_descriptor_set(
71                    flare_dht::proto::FILE_DESCRIPTOR_SET,
72                )
73                .build_v1alpha()
74                .unwrap();
75
76        let reflection_server_v1 =
77            tonic_reflection::server::Builder::configure()
78                .register_encoded_file_descriptor_set(
79                    flare_dht::proto::FILE_DESCRIPTOR_SET,
80                )
81                .build_v1()
82                .unwrap();
83
84        if let Some(addr) = options.peer_addr {
85            let flare_node = shared_node.clone();
86            tokio::spawn(async move {
87                let node = flare_node.clone();
88                node.join(&addr).await.unwrap()
89            });
90        };
91
92        tokio::spawn(async move {
93            Server::builder()
94                .add_service(reflection_server_v1a)
95                .add_service(reflection_server_v1)
96                .add_service(FlareKvServer::new(flare_kv))
97                .add_service(FlareControlServer::new(flare_control))
98                .serve(socket)
99                .await
100                .unwrap();
101        });
102    }
103
104    Ok(shared_node)
105}
106
107pub async fn handle_cli(command: FlareCli) -> Result<(), Box<dyn Error>> {
108    match command.command {
109        FlareCommands::Server(server_args) => handle_server(server_args).await,
110        FlareCommands::Collection { opt } => handle_collection(opt).await,
111    }
112}
113
114async fn handle_server(server_args: ServerArgs) -> Result<(), Box<dyn Error>> {
115    let flare_node = start_server(server_args).await?;
116
117    match tokio::signal::ctrl_c().await {
118        Ok(()) => {}
119        Err(err) => {
120            eprintln!("Unable to listen for shutdown signal: {}", err);
121            // we also shut down in case of error
122        }
123    }
124    info!("starting a clean up for shutdown");
125    flare_node.leave().await;
126    info!("done clean up");
127    Ok(())
128}
129
130async fn handle_collection(
131    opt: CollectionOperation,
132) -> Result<(), Box<dyn Error>> {
133    info!("collection {:?}", opt);
134    match opt {
135        CollectionOperation::Create {
136            name,
137            shard_count: partitions,
138            connection,
139        } => {
140            let mut client =
141                FlareKvClient::connect(connection.server_url).await?;
142            let resp = client
143                .create_collection(Request::new(CreateCollectionRequest {
144                    partition_count: partitions as i32,
145                    name: name,
146                    ..Default::default()
147                }))
148                .await?;
149            info!("RESP: {:?}\n", resp);
150        }
151    }
152    Ok(())
153}