wick_component_cli/
cli.rs1#[cfg(feature = "grpc")]
2mod grpc;
3
4use std::net::SocketAddr;
5
6use flow_component::SharedComponent;
7use tokio::signal;
9use tokio::sync::mpsc::Sender;
10use tracing::{debug, info, warn};
11
12use crate::options::Options;
13pub(crate) type Result<T> = std::result::Result<T, crate::error::CliError>;
14
15#[cfg(feature = "reflection")]
16pub(crate) const FILE_DESCRIPTOR_SET: &[u8] = include_bytes!("../../wick-rpc/src/generated/descriptors.bin");
17
18#[derive(Debug, Clone)]
19#[must_use]
20#[non_exhaustive]
21pub struct ServerState {
23 pub rpc: Option<ServerControl>,
25
26 pub id: String,
28}
29
30#[derive(Clone)]
32pub struct ServerControl {
33 pub addr: SocketAddr,
35 tx: Sender<ServerMessage>,
36}
37
38impl std::fmt::Debug for ServerControl {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 f.debug_struct("ServerControl").field("addr", &self.addr).finish()
41 }
42}
43
44impl ServerControl {
45 #[allow(clippy::missing_const_for_fn)]
46 fn maybe_new(opt: Option<(SocketAddr, Sender<ServerMessage>)>) -> Option<Self> {
47 if let Some((addr, tx)) = opt {
48 Some(Self { addr, tx })
49 } else {
50 None
51 }
52 }
53}
54
55impl ServerState {
56 pub async fn stop_rpc_server(&self) {
58 if let Some(ctl) = self.rpc.as_ref() {
59 let _ = ctl.tx.send(ServerMessage::Close).await;
60 }
61 }
62}
63
64#[doc(hidden)]
65pub fn print_info(info: &ServerState) {
66 let mut something_started = false;
67 if let Some(addr) = &info.rpc {
68 let addr = addr.addr;
69 something_started = true;
70 info!("GRPC server bound to {} on port {}", addr.ip(), addr.port());
71 }
72
73 if !something_started {
74 warn!("no server information available, did you intend to start a host without GRPC or a mesh connection?");
75 warn!("if not, try passing the flag --rpc or --mesh to explicitly enable those features.");
76 }
77}
78
79pub async fn start_server(collection: SharedComponent, opts: Option<Options>) -> Result<ServerState> {
81 debug!("starting server with options: {:?}", opts);
82
83 let opts = opts.unwrap_or_default();
84
85 cfg_if::cfg_if! {
86 if #[cfg(feature="grpc")] {
87 let component_service = wick_invocation_server::InvocationServer::new(collection.clone());
88
89 use wick_rpc::rpc::invocation_service_server::InvocationServiceServer;
90 let svc = InvocationServiceServer::new(component_service);
91
92 let rpc_addr = if let Some(rpc_options) = &opts.rpc {
93 if !rpc_options.enabled {
94 None
95 } else {
96 let addr = grpc::start_rpc_server(rpc_options, svc.clone()).await?;
97 Some(addr)
98 }
99 } else {
100 None
101 };
102 } else {
103 let rpc_addr = None;
104 }
105 };
106
107 Ok(ServerState {
108 id: opts.id,
109 rpc: ServerControl::maybe_new(rpc_addr),
110 })
111}
112
113enum ServerMessage {
114 Close,
115}
116
117pub async fn init_cli(collection: SharedComponent, opts: Option<Options>) -> Result<()> {
120 let state = start_server(collection, opts).await?;
121 print_info(&state);
122
123 info!("waiting for ctrl-C");
124 signal::ctrl_c().await?;
125 println!(); state.stop_rpc_server().await;
127
128 Ok(())
129}