wick_component_cli/
cli.rs

1#[cfg(feature = "grpc")]
2mod grpc;
3
4use std::net::SocketAddr;
5
6use flow_component::SharedComponent;
7// use std::sync::Arc;
8use tokio::signal;
9use tokio::sync::mpsc::Sender;
10use tracing::{debug, info, warn};
11
12use crate::options::Options;
13pub(crate) type Result<T> = std::result::Result<T, crate::error::CliError>;
14
15#[cfg(feature = "reflection")]
16pub(crate) const FILE_DESCRIPTOR_SET: &[u8] = include_bytes!("../../wick-rpc/src/generated/descriptors.bin");
17
18#[derive(Debug, Clone)]
19#[must_use]
20#[non_exhaustive]
21/// Metadata for the running server.
22pub struct ServerState {
23  /// The address of the RPC server if it is running.
24  pub rpc: Option<ServerControl>,
25
26  /// The ID of the server.
27  pub id: String,
28}
29
30/// Struct that holds control methods and metadata for a running service.
31#[derive(Clone)]
32pub struct ServerControl {
33  /// The address of the RPC server.
34  pub addr: SocketAddr,
35  tx: Sender<ServerMessage>,
36}
37
38impl std::fmt::Debug for ServerControl {
39  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40    f.debug_struct("ServerControl").field("addr", &self.addr).finish()
41  }
42}
43
44impl ServerControl {
45  #[allow(clippy::missing_const_for_fn)]
46  fn maybe_new(opt: Option<(SocketAddr, Sender<ServerMessage>)>) -> Option<Self> {
47    if let Some((addr, tx)) = opt {
48      Some(Self { addr, tx })
49    } else {
50      None
51    }
52  }
53}
54
55impl ServerState {
56  /// Stop the RPC server if it's running.
57  pub async fn stop_rpc_server(&self) {
58    if let Some(ctl) = self.rpc.as_ref() {
59      let _ = ctl.tx.send(ServerMessage::Close).await;
60    }
61  }
62}
63
64#[doc(hidden)]
65pub fn print_info(info: &ServerState) {
66  let mut something_started = false;
67  if let Some(addr) = &info.rpc {
68    let addr = addr.addr;
69    something_started = true;
70    info!("GRPC server bound to {} on port {}", addr.ip(), addr.port());
71  }
72
73  if !something_started {
74    warn!("no server information available, did you intend to start a host without GRPC or a mesh connection?");
75    warn!("if not, try passing the flag --rpc or --mesh to explicitly enable those features.");
76  }
77}
78
79/// Starts an RPC server for the passed [SharedComponent].
80pub async fn start_server(collection: SharedComponent, opts: Option<Options>) -> Result<ServerState> {
81  debug!("starting server with options: {:?}", opts);
82
83  let opts = opts.unwrap_or_default();
84
85  cfg_if::cfg_if! {
86    if #[cfg(feature="grpc")] {
87      let component_service = wick_invocation_server::InvocationServer::new(collection.clone());
88
89      use wick_rpc::rpc::invocation_service_server::InvocationServiceServer;
90      let svc = InvocationServiceServer::new(component_service);
91
92      let rpc_addr = if let Some(rpc_options) = &opts.rpc {
93        if !rpc_options.enabled {
94          None
95        } else {
96          let addr = grpc::start_rpc_server(rpc_options, svc.clone()).await?;
97          Some(addr)
98        }
99      } else {
100        None
101      };
102    } else {
103      let rpc_addr = None;
104    }
105  };
106
107  Ok(ServerState {
108    id: opts.id,
109    rpc: ServerControl::maybe_new(rpc_addr),
110  })
111}
112
113enum ServerMessage {
114  Close,
115}
116
117/// Start a server with the passed [SharedComponent] and keep it
118/// running until the process receives a SIGINT (^C).
119pub async fn init_cli(collection: SharedComponent, opts: Option<Options>) -> Result<()> {
120  let state = start_server(collection, opts).await?;
121  print_info(&state);
122
123  info!("waiting for ctrl-C");
124  signal::ctrl_c().await?;
125  println!(); // start on a new line.
126  state.stop_rpc_server().await;
127
128  Ok(())
129}