use std::any::Any;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;
use std::net::SocketAddr;
use std::sync::Arc;
use anyhow::Result;
use append_only_vec::AppendOnlyVec;
use async_trait::async_trait;
use hydro_deploy_integration::ServerBindConfig;
use rust_crate::build::BuildOutput;
use rust_crate::tracing_options::TracingOptions;
use tokio::sync::{mpsc, oneshot};
pub mod deployment;
pub use deployment::Deployment;
pub mod progress;
pub mod localhost;
pub use localhost::LocalhostHost;
pub mod ssh;
pub mod gcp;
pub use gcp::GcpComputeEngineHost;
pub mod azure;
pub use azure::AzureHost;
pub mod aws;
pub use aws::{AwsEc2Host, AwsNetwork};
pub mod rust_crate;
pub use rust_crate::RustCrate;
pub mod custom_service;
pub use custom_service::CustomService;
pub mod terraform;
pub mod util;
#[derive(Default)]
pub struct ResourcePool {
pub terraform: terraform::TerraformPool,
}
pub struct ResourceBatch {
pub terraform: terraform::TerraformBatch,
}
impl ResourceBatch {
fn new() -> ResourceBatch {
ResourceBatch {
terraform: terraform::TerraformBatch::default(),
}
}
async fn provision(
self,
pool: &mut ResourcePool,
last_result: Option<Arc<ResourceResult>>,
) -> Result<ResourceResult> {
Ok(ResourceResult {
terraform: self.terraform.provision(&mut pool.terraform).await?,
_last_result: last_result,
})
}
}
#[derive(Debug)]
pub struct ResourceResult {
pub terraform: terraform::TerraformResult,
_last_result: Option<Arc<ResourceResult>>,
}
#[cfg(feature = "profile-folding")]
#[derive(Clone, Debug)]
pub struct TracingResults {
pub folded_data: Vec<u8>,
}
#[async_trait]
pub trait LaunchedBinary: Send + Sync {
fn stdin(&self) -> mpsc::UnboundedSender<String>;
fn deploy_stdout(&self) -> oneshot::Receiver<String>;
fn stdout(&self) -> mpsc::UnboundedReceiver<String>;
fn stderr(&self) -> mpsc::UnboundedReceiver<String>;
fn stdout_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String>;
fn stderr_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String>;
#[cfg(feature = "profile-folding")]
fn tracing_results(&self) -> Option<&TracingResults>;
fn exit_code(&self) -> Option<i32>;
async fn wait(&self) -> Result<i32>;
async fn stop(&self) -> Result<()>;
}
#[async_trait]
pub trait LaunchedHost: Send + Sync {
fn base_server_config(&self, strategy: &BaseServerStrategy) -> ServerBindConfig;
fn server_config(&self, strategy: &ServerStrategy) -> ServerBindConfig {
match strategy {
ServerStrategy::Direct(b) => self.base_server_config(b),
ServerStrategy::Many(b) => {
ServerBindConfig::MultiConnection(Box::new(self.base_server_config(b)))
}
ServerStrategy::Demux(demux) => ServerBindConfig::Demux(
demux
.iter()
.map(|(key, underlying)| (*key, self.server_config(underlying)))
.collect(),
),
ServerStrategy::Merge(merge) => ServerBindConfig::Merge(
merge
.iter()
.map(|underlying| self.server_config(underlying))
.collect(),
),
ServerStrategy::Tagged(underlying, id) => {
ServerBindConfig::Tagged(Box::new(self.server_config(underlying)), *id)
}
ServerStrategy::Null => ServerBindConfig::Null,
}
}
async fn copy_binary(&self, binary: &BuildOutput) -> Result<()>;
async fn launch_binary(
&self,
id: String,
binary: &BuildOutput,
args: &[String],
perf: Option<TracingOptions>,
env: &HashMap<String, String>,
pin_to_core: Option<usize>,
) -> Result<Box<dyn LaunchedBinary>>;
async fn forward_port(&self, addr: &SocketAddr) -> Result<SocketAddr>;
}
pub enum BaseServerStrategy {
UnixSocket,
InternalTcpPort(Option<u16>),
ExternalTcpPort(
u16,
),
}
pub enum ServerStrategy {
Direct(BaseServerStrategy),
Many(BaseServerStrategy),
Demux(BTreeMap<u32, ServerStrategy>),
Merge(Box<AppendOnlyVec<ServerStrategy>>),
Tagged(Box<ServerStrategy>, u32),
Null,
}
pub enum ClientStrategy<'a> {
UnixSocket(
usize,
),
InternalTcpPort(
&'a dyn Host,
),
ForwardedTcpPort(
&'a dyn Host,
),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum HostTargetType {
Local,
Linux(LinuxCompileType),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum LinuxCompileType {
Glibc,
Musl,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum PortNetworkHint {
Auto,
TcpPort(Option<u16>),
}
pub type HostStrategyGetter = Box<dyn FnOnce(&dyn Any) -> BaseServerStrategy>;
pub trait Host: Any + Send + Sync + Debug {
fn target_type(&self) -> HostTargetType;
fn request_port_base(&self, bind_type: &BaseServerStrategy);
fn request_port(&self, bind_type: &ServerStrategy) {
match bind_type {
ServerStrategy::Direct(base) => self.request_port_base(base),
ServerStrategy::Many(base) => self.request_port_base(base),
ServerStrategy::Demux(demux) => {
for bind_type in demux.values() {
self.request_port(bind_type);
}
}
ServerStrategy::Merge(merge) => {
for bind_type in merge.iter() {
self.request_port(bind_type);
}
}
ServerStrategy::Tagged(underlying, _) => {
self.request_port(underlying);
}
ServerStrategy::Null => {}
}
}
fn id(&self) -> usize;
fn request_custom_binary(&self);
fn collect_resources(&self, resource_batch: &mut ResourceBatch);
fn provision(&self, resource_result: &Arc<ResourceResult>) -> Arc<dyn LaunchedHost>;
fn launched(&self) -> Option<Arc<dyn LaunchedHost>>;
fn strategy_as_server<'a>(
&'a self,
connection_from: &dyn Host,
server_tcp_port_hint: PortNetworkHint,
) -> Result<(ClientStrategy<'a>, HostStrategyGetter)>;
fn can_connect_to(&self, typ: ClientStrategy) -> bool;
}
#[async_trait]
pub trait Service: Send + Sync {
fn collect_resources(&self, resource_batch: &mut ResourceBatch);
async fn deploy(&self, resource_result: &Arc<ResourceResult>) -> Result<()>;
async fn ready(&self) -> Result<()>;
async fn start(&self) -> Result<()>;
async fn stop(&self) -> Result<()>;
}
pub trait ServiceBuilder {
type Service: Service + 'static;
fn build(self, id: usize, on: Arc<dyn Host>) -> Self::Service;
}
impl<S: Service + 'static, This: FnOnce(usize, Arc<dyn Host>) -> S> ServiceBuilder for This {
type Service = S;
fn build(self, id: usize, on: Arc<dyn Host>) -> Self::Service {
(self)(id, on)
}
}