1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#[cfg(feature = "grpc")]
mod grpc;

use std::net::SocketAddr;

use flow_component::SharedComponent;
// use std::sync::Arc;
use tokio::signal;
use tokio::sync::mpsc::Sender;
use tracing::{debug, info, warn};

use crate::options::Options;
pub(crate) type Result<T> = std::result::Result<T, crate::error::CliError>;

#[cfg(feature = "reflection")]
pub(crate) const FILE_DESCRIPTOR_SET: &[u8] = include_bytes!("../../wick-rpc/src/generated/descriptors.bin");

#[derive(Debug, Clone)]
#[must_use]
#[non_exhaustive]
/// Metadata for the running server.
pub struct ServerState {
  /// The address of the RPC server if it is running.
  pub rpc: Option<ServerControl>,

  /// The ID of the server.
  pub id: String,
}

/// Struct that holds control methods and metadata for a running service.
#[derive(Clone)]
pub struct ServerControl {
  /// The address of the RPC server.
  pub addr: SocketAddr,
  tx: Sender<ServerMessage>,
}

impl std::fmt::Debug for ServerControl {
  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    f.debug_struct("ServerControl").field("addr", &self.addr).finish()
  }
}

impl ServerControl {
  #[allow(clippy::missing_const_for_fn)]
  fn maybe_new(opt: Option<(SocketAddr, Sender<ServerMessage>)>) -> Option<Self> {
    if let Some((addr, tx)) = opt {
      Some(Self { addr, tx })
    } else {
      None
    }
  }
}

impl ServerState {
  /// Stop the RPC server if it's running.
  pub async fn stop_rpc_server(&self) {
    if let Some(ctl) = self.rpc.as_ref() {
      let _ = ctl.tx.send(ServerMessage::Close).await;
    }
  }
}

#[doc(hidden)]
pub fn print_info(info: &ServerState) {
  let mut something_started = false;
  if let Some(addr) = &info.rpc {
    let addr = addr.addr;
    something_started = true;
    info!("GRPC server bound to {} on port {}", addr.ip(), addr.port());
  }

  if !something_started {
    warn!("no server information available, did you intend to start a host without GRPC or a mesh connection?");
    warn!("if not, try passing the flag --rpc or --mesh to explicitly enable those features.");
  }
}

/// Starts an RPC server for the passed [SharedComponent].
pub async fn start_server(collection: SharedComponent, opts: Option<Options>) -> Result<ServerState> {
  debug!("starting server with options: {:?}", opts);

  let opts = opts.unwrap_or_default();

  cfg_if::cfg_if! {
    if #[cfg(feature="grpc")] {
      let component_service = wick_invocation_server::InvocationServer::new(collection.clone());

      use wick_rpc::rpc::invocation_service_server::InvocationServiceServer;
      let svc = InvocationServiceServer::new(component_service);

      let rpc_addr = if let Some(rpc_options) = &opts.rpc {
        if !rpc_options.enabled {
          None
        } else {
          let addr = grpc::start_rpc_server(rpc_options, svc.clone()).await?;
          Some(addr)
        }
      } else {
        None
      };
    } else {
      let rpc_addr = None;
    }
  };

  Ok(ServerState {
    id: opts.id,
    rpc: ServerControl::maybe_new(rpc_addr),
  })
}

enum ServerMessage {
  Close,
}

/// Start a server with the passed [SharedComponent] and keep it
/// running until the process receives a SIGINT (^C).
pub async fn init_cli(collection: SharedComponent, opts: Option<Options>) -> Result<()> {
  let state = start_server(collection, opts).await?;
  print_info(&state);

  info!("waiting for ctrl-C");
  signal::ctrl_c().await?;
  println!(); // start on a new line.
  state.stop_rpc_server().await;

  Ok(())
}