use serde::Deserialize;
use std::collections::HashMap;
use std::path::Path;
#[derive(Debug, Clone)]
pub enum Vessel {
Ship { group: String, config: ShipConfig },
Bay { bay_type: String, config: BayConfig },
}
impl Vessel {
pub fn name(&self) -> &str {
match self {
Vessel::Ship { config, .. } => &config.name,
Vessel::Bay { config, .. } => &config.name,
}
}
pub fn routes(&self) -> &[RouteConfig] {
match self {
Vessel::Ship { config, .. } => &config.routes,
Vessel::Bay { config, .. } => &config.routes,
}
}
pub fn is_critical(&self) -> bool {
match self {
Vessel::Ship { config, .. } => config.critical,
Vessel::Bay { config, .. } => config.critical,
}
}
pub fn command(&self) -> &str {
match self {
Vessel::Ship { config, .. } => &config.command,
Vessel::Bay { config, .. } => &config.command,
}
}
pub fn args(&self) -> &[String] {
match self {
Vessel::Ship { config, .. } => &config.args,
Vessel::Bay { config, .. } => &config.args,
}
}
pub fn env(&self) -> &HashMap<String, String> {
match self {
Vessel::Ship { config, .. } => &config.env,
Vessel::Bay { config, .. } => &config.env,
}
}
pub fn depends_on(&self) -> &[String] {
match self {
Vessel::Ship { config, .. } => &config.depends_on,
Vessel::Bay { config, .. } => &config.depends_on,
}
}
pub fn is_bay(&self) -> bool {
matches!(self, Vessel::Bay { .. })
}
pub fn is_ship(&self) -> bool {
matches!(self, Vessel::Ship { .. })
}
}
#[derive(Debug, Deserialize)]
pub struct Manifest {
#[serde(default)]
pub mothership: MothershipConfig,
#[serde(default)]
pub base: BaseTemplates,
#[serde(default, flatten)]
fleet_raw: HashMap<String, toml::Value>,
#[serde(default, rename = "modules")]
modules_raw: Vec<ModuleRaw>,
#[serde(skip)]
pub modules: Vec<Module>,
#[serde(skip)]
pub fleet: HashMap<String, Vec<ShipConfig>>,
#[serde(skip)]
pub bays: HashMap<String, Vec<BayConfig>>,
#[serde(skip)]
pub vessels: Vec<Vessel>,
}
#[derive(Debug, Clone, Default, Deserialize)]
pub struct BaseTemplates {
#[serde(default)]
pub ship: BaseShip,
#[serde(default)]
pub bay: BaseBay,
#[serde(default)]
pub module: BaseModule,
}
#[derive(Debug, Clone, Default, Deserialize)]
pub struct BaseShip {
#[serde(default)]
pub env: HashMap<String, String>,
pub critical: Option<bool>,
#[serde(default)]
pub tags: Vec<String>,
pub comment: Option<String>,
}
#[derive(Debug, Clone, Default, Deserialize)]
pub struct BaseBay {
#[serde(default)]
pub env: HashMap<String, String>,
pub critical: Option<bool>,
#[serde(default)]
pub tags: Vec<String>,
pub comment: Option<String>,
}
#[derive(Debug, Clone, Default, Deserialize)]
pub struct BaseModule {
pub phase: Option<Phase>,
#[serde(default)]
pub tags: Vec<String>,
pub comment: Option<String>,
#[serde(default)]
pub config: HashMap<String, String>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct StaticDirConfig {
pub path: String,
#[serde(default = "default_static_prefix")]
pub prefix: String,
pub bind: Option<String>,
}
fn default_static_prefix() -> String {
"/static".to_string()
}
#[derive(Debug, Clone, Deserialize)]
pub struct Uplink {
pub url: String,
pub name: String,
#[serde(default = "default_uplink_timeout")]
pub timeout: String,
}
fn default_uplink_timeout() -> String {
"5s".to_string()
}
#[derive(Debug, Clone, Default, Deserialize)]
pub struct FlagshipConfig {
#[serde(default)]
pub enabled: bool,
#[serde(default = "default_election")]
pub election: String,
pub election_url: Option<String>,
#[serde(default)]
pub static_flagship: StaticFlagship,
#[serde(default = "default_election_timeout")]
pub election_timeout: u64,
#[serde(default = "default_prelaunch_timeout")]
pub prelaunch_timeout: u64,
#[serde(default = "default_heartbeat_interval")]
pub heartbeat_interval: u64,
}
fn default_election() -> String {
"static".to_string()
}
fn default_election_timeout() -> u64 {
30
}
fn default_prelaunch_timeout() -> u64 {
300
}
fn default_heartbeat_interval() -> u64 {
5
}
#[derive(Debug, Clone, Default)]
pub enum StaticFlagship {
#[default]
None,
EnvVar(String),
Command { command: String, equals: String },
}
impl<'de> Deserialize<'de> for StaticFlagship {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::{self, MapAccess, Visitor};
struct StaticFlagshipVisitor;
impl<'de> Visitor<'de> for StaticFlagshipVisitor {
type Value = StaticFlagship;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("a string starting with $ or an object with command/equals")
}
fn visit_str<E>(self, s: &str) -> Result<StaticFlagship, E>
where
E: de::Error,
{
if let Some(var) = s.strip_prefix('$') {
Ok(StaticFlagship::EnvVar(var.to_string()))
} else {
Err(de::Error::custom(
"static_flagship string must start with $ for env var",
))
}
}
fn visit_map<M>(self, mut map: M) -> Result<StaticFlagship, M::Error>
where
M: MapAccess<'de>,
{
let mut command = None;
let mut equals = None;
while let Some(key) = map.next_key::<String>()? {
match key.as_str() {
"command" => command = Some(map.next_value()?),
"equals" => equals = Some(map.next_value()?),
_ => {
let _: toml::Value = map.next_value()?;
}
}
}
match (command, equals) {
(Some(cmd), Some(eq)) => Ok(StaticFlagship::Command {
command: cmd,
equals: eq,
}),
_ => Err(de::Error::custom(
"static_flagship object requires both 'command' and 'equals' fields",
)),
}
}
fn visit_none<E>(self) -> Result<StaticFlagship, E>
where
E: de::Error,
{
Ok(StaticFlagship::None)
}
fn visit_unit<E>(self) -> Result<StaticFlagship, E>
where
E: de::Error,
{
Ok(StaticFlagship::None)
}
}
deserializer.deserialize_any(StaticFlagshipVisitor)
}
}
impl StaticFlagship {
pub fn evaluate(&self) -> std::io::Result<bool> {
match self {
StaticFlagship::None => Ok(false),
StaticFlagship::EnvVar(var) => {
let val = std::env::var(var).unwrap_or_default().to_lowercase();
Ok(val == "true" || val == "1" || val == "yes")
}
StaticFlagship::Command { command, equals } => {
let output = std::process::Command::new("sh")
.arg("-c")
.arg(command)
.output()?;
let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
Ok(stdout == *equals)
}
}
}
}
impl FlagshipConfig {
pub fn expanded_election_url(&self) -> Option<anyhow::Result<String>> {
self.election_url.as_ref().map(|url| expand_env_vars(url))
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct Prelaunch {
pub name: String,
pub command: String,
#[serde(default)]
pub args: Vec<String>,
#[serde(default)]
pub env: HashMap<String, String>,
#[serde(default)]
pub depends_on: Vec<String>,
pub comment: Option<String>,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(untagged)]
pub enum CorsCacheConfig {
Enabled(bool),
Full {
#[serde(default = "default_cors_cache_enabled")]
enabled: bool,
#[serde(default = "default_cors_cache_ttl")]
default_ttl: u64,
#[serde(default = "default_cors_cache_max_entries")]
max_entries: usize,
},
}
fn default_cors_cache_enabled() -> bool {
true
}
fn default_cors_cache_ttl() -> u64 {
3600
}
fn default_cors_cache_max_entries() -> usize {
10000
}
impl Default for CorsCacheConfig {
fn default() -> Self {
Self::Enabled(false)
}
}
impl CorsCacheConfig {
pub fn is_enabled(&self) -> bool {
match self {
Self::Enabled(enabled) => *enabled,
Self::Full { enabled, .. } => *enabled,
}
}
pub fn default_ttl(&self) -> u64 {
match self {
Self::Enabled(_) => default_cors_cache_ttl(),
Self::Full { default_ttl, .. } => *default_ttl,
}
}
pub fn max_entries(&self) -> usize {
match self {
Self::Enabled(_) => default_cors_cache_max_entries(),
Self::Full { max_entries, .. } => *max_entries,
}
}
}
#[derive(Debug, Clone, Default, Deserialize)]
pub struct MothershipConfig {
#[serde(default)]
pub bind: HashMap<String, Bind>,
pub metrics_port: Option<u16>,
#[serde(default)]
pub env: HashMap<String, String>,
#[serde(default)]
pub static_dirs: Vec<StaticDirConfig>,
#[serde(default)]
pub compression: bool,
#[serde(default)]
pub cors_cache: CorsCacheConfig,
#[serde(default)]
pub uplinks: Vec<Uplink>,
#[serde(default)]
pub prelaunch: Vec<Prelaunch>,
#[serde(default)]
pub flagship: FlagshipConfig,
pub rate_limiting: Option<RateLimitingConfig>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct RateLimitingConfig {
pub global_rps: Option<f64>,
pub per_ip_rpm: Option<f64>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct ShipConfig {
pub name: String,
pub command: String,
#[serde(default)]
pub args: Vec<String>,
pub bind: Option<Bind>,
pub healthcheck: Option<String>,
#[serde(default)]
pub depends_on: Vec<String>,
#[serde(default)]
pub routes: Vec<RouteConfig>,
#[serde(default)]
pub env: HashMap<String, String>,
#[serde(default, rename = "critical")]
critical_override: Option<bool>,
#[serde(skip)]
pub critical: bool,
#[serde(default)]
pub oneshot: bool,
#[serde(default)]
pub tags: Vec<String>,
pub comment: Option<String>,
}
impl ShipConfig {
fn merge_base(&mut self, base: &BaseShip) {
let mut merged_env = base.env.clone();
merged_env.extend(self.env.clone());
self.env = merged_env;
self.critical = self.critical_override.or(base.critical).unwrap_or(true);
let mut merged_tags = base.tags.clone();
for tag in &self.tags {
if !merged_tags.contains(tag) {
merged_tags.push(tag.clone());
}
}
self.tags = merged_tags;
if self.comment.is_none() {
self.comment = base.comment.clone();
}
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct BayConfig {
pub name: String,
pub command: String,
#[serde(default)]
pub args: Vec<String>,
#[serde(default)]
pub depends_on: Vec<String>,
#[serde(default)]
pub routes: Vec<RouteConfig>,
#[serde(default)]
pub env: HashMap<String, String>,
#[serde(default)]
pub config: HashMap<String, String>,
#[serde(default, rename = "critical")]
critical_override: Option<bool>,
#[serde(skip)]
pub critical: bool,
#[serde(default)]
pub tags: Vec<String>,
pub comment: Option<String>,
#[serde(skip)]
pub bay_type: String,
}
impl BayConfig {
fn merge_base(&mut self, base: &BaseBay) {
let mut merged_env = base.env.clone();
merged_env.extend(self.env.clone());
self.env = merged_env;
self.critical = self.critical_override.or(base.critical).unwrap_or(true);
let mut merged_tags = base.tags.clone();
for tag in &self.tags {
if !merged_tags.contains(tag) {
merged_tags.push(tag.clone());
}
}
self.tags = merged_tags;
if self.comment.is_none() {
self.comment = base.comment.clone();
}
}
}
#[derive(Debug, Clone)]
pub struct RouteConfig {
pub bind: String,
pub pattern: String,
pub strip_prefix: Option<String>,
pub ua_filter: Option<UaFilter>,
}
#[derive(Debug, Clone)]
pub enum UaFilter {
Browser,
BrowserKind(String),
Llm,
Bot,
Pattern(String),
}
impl UaFilter {
pub fn parse(s: &str) -> Self {
match s.to_lowercase().as_str() {
"browser" => UaFilter::Browser,
"chromium" | "chrome" => UaFilter::BrowserKind("chromium".to_string()),
"firefox" => UaFilter::BrowserKind("firefox".to_string()),
"safari" => UaFilter::BrowserKind("safari".to_string()),
"llm" => UaFilter::Llm,
"bot" => UaFilter::Bot,
_ if s.starts_with('~') => UaFilter::Pattern(s[1..].to_string()),
_ => UaFilter::Pattern(s.to_string()),
}
}
pub fn matches(&self, ua_header: &str, ua_kind: Option<&str>) -> bool {
match self {
UaFilter::Browser => {
matches!(ua_kind, Some("Chromium" | "Firefox" | "Safari"))
}
UaFilter::BrowserKind(kind) => ua_kind.is_some_and(|k| k.eq_ignore_ascii_case(kind)),
UaFilter::Llm => {
let ua_lower = ua_header.to_lowercase();
ua_lower.contains("claude") || ua_lower.contains("anthropic")
}
UaFilter::Bot => {
let ua_lower = ua_header.to_lowercase();
ua_lower.contains("bot")
|| ua_lower.contains("crawler")
|| ua_lower.contains("spider")
|| ua_lower.contains("scraper")
|| ua_lower.contains("curl")
|| ua_lower.contains("wget")
|| ua_lower.contains("python")
|| ua_lower.contains("httpclient")
}
UaFilter::Pattern(pattern) => regex::Regex::new(pattern)
.map(|re| re.is_match(ua_header))
.unwrap_or(false),
}
}
}
impl std::fmt::Display for RouteConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}:{}", self.bind, self.pattern)
}
}
impl<'de> Deserialize<'de> for RouteConfig {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::{self, MapAccess, Visitor};
struct RouteConfigVisitor;
impl<'de> Visitor<'de> for RouteConfigVisitor {
type Value = RouteConfig;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("a route config object or a string pattern")
}
fn visit_map<M>(self, mut map: M) -> Result<RouteConfig, M::Error>
where
M: MapAccess<'de>,
{
let mut bind = None;
let mut pattern = None;
let mut strip_prefix = None;
let mut ua_filter = None;
while let Some(key) = map.next_key::<String>()? {
match key.as_str() {
"bind" => bind = Some(map.next_value()?),
"pattern" => pattern = Some(map.next_value()?),
"strip_prefix" => strip_prefix = Some(map.next_value()?),
"ua_filter" => {
let filter_str: String = map.next_value()?;
ua_filter = Some(UaFilter::parse(&filter_str));
}
_ => {
let _: toml::Value = map.next_value()?;
}
}
}
Ok(RouteConfig {
bind: bind.ok_or_else(|| de::Error::missing_field("bind"))?,
pattern: pattern.ok_or_else(|| de::Error::missing_field("pattern"))?,
strip_prefix,
ua_filter,
})
}
fn visit_str<E>(self, s: &str) -> Result<RouteConfig, E>
where
E: de::Error,
{
if let Some((bind, pattern)) = s.split_once(':') {
Ok(RouteConfig {
bind: bind.to_string(),
pattern: pattern.to_string(),
strip_prefix: None,
ua_filter: None,
})
} else {
Err(de::Error::custom(format!(
"Route '{}' missing bind prefix (e.g., 'http:{}')",
s, s
)))
}
}
}
deserializer.deserialize_any(RouteConfigVisitor)
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct ModuleRaw {
pub name: String,
pub wasm: String,
#[serde(default)]
pub routes: Vec<String>,
pub phase: Option<Phase>,
#[serde(default)]
pub config: HashMap<String, String>,
#[serde(default)]
pub tags: Vec<String>,
pub comment: Option<String>,
}
#[derive(Debug, Clone)]
pub struct Module {
pub name: String,
pub wasm: String,
pub routes: Vec<String>,
pub phase: Phase,
pub config: HashMap<String, String>,
pub tags: Vec<String>,
pub comment: Option<String>,
}
impl Module {
fn from_raw(raw: ModuleRaw, base: &BaseModule) -> Self {
let mut merged_config = base.config.clone();
merged_config.extend(raw.config);
let mut merged_tags = base.tags.clone();
for tag in &raw.tags {
if !merged_tags.contains(tag) {
merged_tags.push(tag.clone());
}
}
Self {
name: raw.name,
wasm: raw.wasm,
routes: raw.routes,
phase: raw.phase.or(base.phase).unwrap_or_default(),
config: merged_config,
tags: merged_tags,
comment: raw.comment.or_else(|| base.comment.clone()),
}
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum Phase {
#[default]
Request,
Response,
}
#[derive(Debug, Clone)]
pub enum Bind {
Tcp {
host: String,
port: u16,
proxy_protocol: bool,
},
Unix {
path: String,
},
}
impl<'de> Deserialize<'de> for Bind {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::{self, MapAccess, Visitor};
struct BindVisitor;
impl<'de> Visitor<'de> for BindVisitor {
type Value = Bind;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("a bind address string or object with addr and proxy_protocol")
}
fn visit_str<E>(self, s: &str) -> Result<Bind, E>
where
E: de::Error,
{
Bind::parse(s).map_err(de::Error::custom)
}
fn visit_map<M>(self, mut map: M) -> Result<Bind, M::Error>
where
M: MapAccess<'de>,
{
let mut addr: Option<String> = None;
let mut proxy_protocol = false;
while let Some(key) = map.next_key::<String>()? {
match key.as_str() {
"addr" => addr = Some(map.next_value()?),
"proxy_protocol" => proxy_protocol = map.next_value()?,
_ => {
let _: toml::Value = map.next_value()?;
}
}
}
let addr_str = addr.ok_or_else(|| de::Error::missing_field("addr"))?;
let mut bind = Bind::parse(&addr_str).map_err(de::Error::custom)?;
if let Bind::Tcp {
proxy_protocol: ref mut pp,
..
} = bind
{
*pp = proxy_protocol;
} else if proxy_protocol {
return Err(de::Error::custom(
"proxy_protocol is only supported for TCP binds",
));
}
Ok(bind)
}
}
deserializer.deserialize_any(BindVisitor)
}
}
impl Bind {
pub fn parse(s: &str) -> Result<Self, String> {
if let Some(path) = s.strip_prefix("unix://") {
Ok(Bind::Unix {
path: path.to_string(),
})
} else if let Some(addr) = s.strip_prefix("tcp://") {
Self::parse_tcp(addr)
} else if let Ok(port) = s.parse::<u16>() {
Ok(Bind::Tcp {
host: "127.0.0.1".to_string(),
port,
proxy_protocol: false,
})
} else if s.contains(':') {
Self::parse_tcp(s)
} else {
Err(format!("Invalid bind format: {s}"))
}
}
fn parse_tcp(addr: &str) -> Result<Self, String> {
let (host, port_str) = addr
.rsplit_once(':')
.ok_or_else(|| format!("Missing port in: {addr}"))?;
let port = port_str
.parse::<u16>()
.map_err(|_| format!("Invalid port: {port_str}"))?;
Ok(Bind::Tcp {
host: host.to_string(),
port,
proxy_protocol: false,
})
}
pub fn has_proxy_protocol(&self) -> bool {
matches!(
self,
Bind::Tcp {
proxy_protocol: true,
..
}
)
}
pub fn healthcheck_base(&self) -> String {
match self {
Bind::Tcp { host, port, .. } => {
let h = if host == "0.0.0.0" { "127.0.0.1" } else { host };
format!("http://{h}:{port}")
}
Bind::Unix { path } => format!("unix://{path}"),
}
}
}
fn validate_name(name: &str, kind: &str, group: Option<&str>) -> anyhow::Result<()> {
let context = group
.map(|g| format!(" in fleet.{}", g))
.unwrap_or_default();
if name.is_empty() {
anyhow::bail!("Empty {} name{}", kind.to_lowercase(), context);
}
if !name.is_ascii() {
anyhow::bail!(
"{} name '{}'{} contains non-ASCII characters",
kind,
name,
context
);
}
if name.contains(' ') {
anyhow::bail!("{} name '{}'{} contains spaces", kind, name, context);
}
Ok(())
}
impl Manifest {
pub fn load<P: AsRef<Path>>(path: P) -> anyhow::Result<Self> {
let content = std::fs::read_to_string(path.as_ref())?;
Self::parse(&content)
}
pub fn parse(content: &str) -> anyhow::Result<Self> {
let mut manifest: Manifest = toml::from_str(content)?;
manifest.parse_vessels(content)?;
manifest.merge_modules();
manifest.validate()?;
manifest.expand_env_vars()?;
Ok(manifest)
}
fn validate(&self) -> anyhow::Result<()> {
let mut seen_vessels: std::collections::HashSet<String> = std::collections::HashSet::new();
for vessel in &self.vessels {
let (name, kind, subtype) = match vessel {
Vessel::Ship { group, config } => (&config.name, "Ship", group.as_str()),
Vessel::Bay { bay_type, config } => (&config.name, "Bay", bay_type.as_str()),
};
validate_name(name, kind, Some(subtype))?;
let name_lower = name.to_lowercase();
if seen_vessels.contains(&name_lower) {
anyhow::bail!(
"Duplicate vessel name '{}' in {}.{}",
name,
kind.to_lowercase(),
subtype
);
}
seen_vessels.insert(name_lower);
}
self.validate_vessel_dependencies()?;
let mut seen_modules: std::collections::HashSet<String> = std::collections::HashSet::new();
for module in &self.modules {
validate_name(&module.name, "Module", None)?;
if !seen_modules.insert(module.name.to_lowercase()) {
anyhow::bail!("Duplicate module name '{}'", module.name);
}
}
let mut seen_prelaunch: std::collections::HashSet<String> =
std::collections::HashSet::new();
for prelaunch in &self.mothership.prelaunch {
validate_name(&prelaunch.name, "Prelaunch", None)?;
if !seen_prelaunch.insert(prelaunch.name.to_lowercase()) {
anyhow::bail!("Duplicate prelaunch job name '{}'", prelaunch.name);
}
}
for prelaunch in &self.mothership.prelaunch {
for dep in &prelaunch.depends_on {
if !seen_prelaunch.contains(&dep.to_lowercase()) {
anyhow::bail!(
"Prelaunch job '{}' depends on unknown job '{}'",
prelaunch.name,
dep
);
}
}
}
if let Some(metrics_port) = self.mothership.metrics_port {
for (bind_name, bind) in &self.mothership.bind {
if let Bind::Tcp { port, .. } = bind
&& *port == metrics_port
{
anyhow::bail!(
"metrics_port {} conflicts with mothership.bind.{} (same TCP port)",
metrics_port,
bind_name
);
}
}
}
if self.mothership.flagship.enabled {
let flagship = &self.mothership.flagship;
match flagship.election.as_str() {
"postgres" => {
let url = flagship
.expanded_election_url()
.ok_or_else(|| {
anyhow::anyhow!(
"flagship.election_url is required when election = \"postgres\""
)
})?
.map_err(|e| {
anyhow::anyhow!("failed to expand flagship.election_url: {}", e)
})?;
if !url.starts_with("postgres://") && !url.starts_with("postgresql://") {
anyhow::bail!(
"flagship.election_url must be a PostgreSQL URL (starts with postgres:// or postgresql://), got: {}",
if url.len() > 20 { &url[..20] } else { &url }
);
}
}
"static" => {
}
other => {
anyhow::bail!(
"flagship.election must be \"static\" or \"postgres\", got \"{}\"",
other
);
}
}
}
Ok(())
}
pub fn validate_vessel_dependencies(&self) -> anyhow::Result<()> {
#[derive(Clone, Copy, PartialEq, Eq)]
enum VisitState {
Visiting,
Visited,
}
fn visit(
name: &str,
vessels_by_name: &HashMap<String, &Vessel>,
states: &mut HashMap<String, VisitState>,
stack: &mut Vec<String>,
) -> anyhow::Result<()> {
match states.get(name) {
Some(VisitState::Visited) => return Ok(()),
Some(VisitState::Visiting) => {
let cycle_start = stack.iter().position(|entry| entry == name).unwrap_or(0);
let mut cycle = stack[cycle_start..].to_vec();
cycle.push(name.to_string());
anyhow::bail!("Circular vessel dependency detected: {}", cycle.join(" -> "));
}
None => {}
}
states.insert(name.to_string(), VisitState::Visiting);
stack.push(name.to_string());
let vessel = vessels_by_name
.get(name)
.ok_or_else(|| anyhow::anyhow!("Unknown vessel '{name}'"))?;
for dep in vessel.depends_on() {
visit(&dep.to_lowercase(), vessels_by_name, states, stack)?;
}
stack.pop();
states.insert(name.to_string(), VisitState::Visited);
Ok(())
}
let vessels_by_name: HashMap<String, &Vessel> = self
.vessels
.iter()
.map(|vessel| (vessel.name().to_lowercase(), vessel))
.collect();
for vessel in &self.vessels {
for dep in vessel.depends_on() {
if !vessels_by_name.contains_key(&dep.to_lowercase()) {
let vessel_kind = if vessel.is_ship() { "Ship" } else { "Bay" };
anyhow::bail!(
"{} '{}' depends on unknown vessel '{}'",
vessel_kind,
vessel.name(),
dep
);
}
}
}
let mut states = HashMap::new();
let mut stack = Vec::new();
for vessel in &self.vessels {
visit(
&vessel.name().to_lowercase(),
&vessels_by_name,
&mut states,
&mut stack,
)?;
}
Ok(())
}
fn parse_vessels(&mut self, content: &str) -> anyhow::Result<()> {
use regex::Regex;
let header_re = Regex::new(r"\[\[(fleet|bays)\.([^\]]+)\]\]").unwrap();
let mut headers: Vec<(usize, &str, &str)> = header_re
.captures_iter(content)
.map(|cap| {
let pos = cap.get(0).unwrap().start();
let kind = cap.get(1).unwrap().as_str();
let subtype = cap.get(2).unwrap().as_str();
(pos, kind, subtype)
})
.collect();
headers.sort_by_key(|(pos, _, _)| *pos);
let fleet_table = self.fleet_raw.remove("fleet");
let bays_table = self.fleet_raw.remove("bays");
let mut fleet_entries: HashMap<String, Vec<ShipConfig>> = HashMap::new();
let mut bays_entries: HashMap<String, Vec<BayConfig>> = HashMap::new();
if let Some(toml::Value::Table(groups)) = fleet_table {
for (group_name, ships_value) in groups {
if let toml::Value::Array(ships_array) = ships_value {
let mut ships = Vec::new();
for ship_value in ships_array {
let mut ship: ShipConfig = ship_value.try_into().map_err(|e| {
anyhow::anyhow!("Failed to parse ship in fleet.{}: {}", group_name, e)
})?;
ship.merge_base(&self.base.ship);
ships.push(ship);
}
fleet_entries.insert(group_name, ships);
}
}
}
if let Some(toml::Value::Table(bay_types)) = bays_table {
for (bay_type, bays_value) in bay_types {
if let toml::Value::Array(bays_array) = bays_value {
let mut bays = Vec::new();
for bay_value in bays_array {
let mut bay: BayConfig = bay_value.try_into().map_err(|e| {
anyhow::anyhow!("Failed to parse bay in bays.{}: {}", bay_type, e)
})?;
bay.bay_type = bay_type.clone();
bay.merge_base(&self.base.bay);
bays.push(bay);
}
bays_entries.insert(bay_type, bays);
}
}
}
let mut fleet_indices: HashMap<&str, usize> = HashMap::new();
let mut bays_indices: HashMap<&str, usize> = HashMap::new();
for (_pos, kind, subtype) in headers {
match kind {
"fleet" => {
let idx = fleet_indices.entry(subtype).or_insert(0);
if let Some(ships) = fleet_entries.get(subtype)
&& let Some(ship) = ships.get(*idx)
{
self.vessels.push(Vessel::Ship {
group: subtype.to_string(),
config: ship.clone(),
});
*idx += 1;
}
}
"bays" => {
let idx = bays_indices.entry(subtype).or_insert(0);
if let Some(bays) = bays_entries.get(subtype)
&& let Some(bay) = bays.get(*idx)
{
self.vessels.push(Vessel::Bay {
bay_type: subtype.to_string(),
config: bay.clone(),
});
*idx += 1;
}
}
_ => {}
}
}
self.fleet = fleet_entries;
self.bays = bays_entries;
Ok(())
}
fn merge_modules(&mut self) {
self.modules = std::mem::take(&mut self.modules_raw)
.into_iter()
.map(|raw| Module::from_raw(raw, &self.base.module))
.collect();
}
pub fn all_ships(&self) -> Vec<(&str, &ShipConfig)> {
self.fleet
.iter()
.flat_map(|(group, ships)| ships.iter().map(move |ship| (group.as_str(), ship)))
.collect()
}
pub fn ship_count(&self) -> usize {
self.fleet.values().map(|v| v.len()).sum()
}
pub fn all_bays(&self) -> Vec<(&str, &BayConfig)> {
self.bays
.iter()
.flat_map(|(bay_type, bays)| bays.iter().map(move |bay| (bay_type.as_str(), bay)))
.collect()
}
pub fn bay_count(&self) -> usize {
self.bays.values().map(|v| v.len()).sum()
}
pub fn filter_ships<F>(&mut self, filter: &F)
where
F: Fn(&[String]) -> bool,
{
for ships in self.fleet.values_mut() {
ships.retain(|ship| filter(&ship.tags));
}
self.fleet.retain(|_, ships| !ships.is_empty());
self.vessels.retain(|vessel| match vessel {
Vessel::Ship { config, .. } => filter(&config.tags),
Vessel::Bay { .. } => true,
});
}
fn expand_env_vars(&mut self) -> anyhow::Result<()> {
for ships in self.fleet.values_mut() {
for ship in ships {
expand_env_map(&mut ship.env)?;
}
}
for bays in self.bays.values_mut() {
for bay in bays {
expand_env_map(&mut bay.env)?;
expand_env_map(&mut bay.config)?;
}
}
for module in &mut self.modules {
expand_env_map(&mut module.config)?;
}
for uplink in &mut self.mothership.uplinks {
uplink.url = expand_env_vars(&uplink.url)?;
}
for prelaunch in &mut self.mothership.prelaunch {
expand_env_map(&mut prelaunch.env)?;
}
Ok(())
}
}
fn expand_env_vars(value: &str) -> anyhow::Result<String> {
let mut result = value.to_string();
while let Some(start) = result.find("${") {
let end = result[start..]
.find('}')
.map(|i| start + i)
.ok_or_else(|| anyhow::anyhow!("Unclosed ${{ in: {value}"))?;
let expr = &result[start + 2..end];
let expanded = if expr.contains(' ') || expr.contains('|') {
let output = std::process::Command::new("sh")
.arg("-c")
.arg(expr)
.output()?;
String::from_utf8_lossy(&output.stdout).trim().to_string()
} else {
std::env::var(expr).unwrap_or_default()
};
result.replace_range(start..=end, &expanded);
}
let re = regex::Regex::new(r"\$([A-Za-z_][A-Za-z0-9_]*)").unwrap();
let expanded = re.replace_all(&result, |caps: ®ex::Captures| {
let var_name = &caps[1];
std::env::var(var_name).unwrap_or_default()
});
Ok(expanded.into_owned())
}
fn expand_env_map(map: &mut HashMap<String, String>) -> anyhow::Result<()> {
for value in map.values_mut() {
*value = expand_env_vars(value)?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bind_parse_tcp() {
let bind = Bind::parse("tcp://0.0.0.0:3000").unwrap();
match bind {
Bind::Tcp {
host,
port,
proxy_protocol,
} => {
assert_eq!(host, "0.0.0.0");
assert_eq!(port, 3000);
assert!(!proxy_protocol);
}
_ => panic!("Expected TCP bind"),
}
}
#[test]
fn test_bind_parse_unix() {
let bind = Bind::parse("unix:///tmp/app.sock").unwrap();
match bind {
Bind::Unix { path } => assert_eq!(path, "/tmp/app.sock"),
_ => panic!("Expected Unix bind"),
}
}
#[test]
fn test_bind_parse_port_only() {
let bind = Bind::parse("3000").unwrap();
match bind {
Bind::Tcp {
host,
port,
proxy_protocol,
} => {
assert_eq!(host, "127.0.0.1");
assert_eq!(port, 3000);
assert!(!proxy_protocol);
}
_ => panic!("Expected TCP bind"),
}
}
#[test]
fn test_bind_with_proxy_protocol() {
let toml = r#"
[mothership.bind]
http = "0.0.0.0:80"
https = { addr = "0.0.0.0:443", proxy_protocol = true }
"#;
let manifest = Manifest::parse(toml).unwrap();
match &manifest.mothership.bind["http"] {
Bind::Tcp { proxy_protocol, .. } => {
assert!(!proxy_protocol);
}
_ => panic!("Expected TCP bind"),
}
match &manifest.mothership.bind["https"] {
Bind::Tcp {
host,
port,
proxy_protocol,
} => {
assert_eq!(host, "0.0.0.0");
assert_eq!(*port, 443);
assert!(*proxy_protocol);
}
_ => panic!("Expected TCP bind"),
}
assert!(!manifest.mothership.bind["http"].has_proxy_protocol());
assert!(manifest.mothership.bind["https"].has_proxy_protocol());
}
#[test]
fn test_manifest_parse_named_binds() {
let toml = r#"
[mothership]
metrics_port = 9090
[mothership.bind]
http = "0.0.0.0:80"
https = "0.0.0.0:443"
ws = "0.0.0.0:8080"
[[fleet.web]]
name = "app"
command = "ruby"
args = ["app.rb"]
bind = "tcp://127.0.0.1:3000"
healthcheck = "/health"
routes = [
{ bind = "http", pattern = "/.*" },
{ bind = "https", pattern = "/.*" },
]
[[fleet.web]]
name = "cable"
command = "anycable-go"
bind = "tcp://127.0.0.1:8081"
routes = [
{ bind = "ws", pattern = "/cable" },
]
[[fleet.workers]]
name = "sidekiq"
command = "bundle"
args = ["exec", "sidekiq"]
critical = false
[[fleet.jobs]]
name = "migrate"
command = "bundle"
args = ["exec", "rails", "db:migrate"]
oneshot = true
"#;
let manifest = Manifest::parse(toml).unwrap();
assert_eq!(manifest.mothership.bind.len(), 3);
assert!(manifest.mothership.bind.contains_key("http"));
assert!(manifest.mothership.bind.contains_key("https"));
assert!(manifest.mothership.bind.contains_key("ws"));
match &manifest.mothership.bind["http"] {
Bind::Tcp {
host,
port,
proxy_protocol,
} => {
assert_eq!(host, "0.0.0.0");
assert_eq!(*port, 80);
assert!(!proxy_protocol);
}
_ => panic!("Expected TCP bind"),
}
assert_eq!(manifest.mothership.metrics_port, Some(9090));
assert_eq!(manifest.fleet.len(), 3);
let web = &manifest.fleet["web"];
assert_eq!(web[0].name, "app");
assert_eq!(web[0].routes.len(), 2);
assert_eq!(web[0].routes[0].bind, "http");
assert_eq!(web[0].routes[0].pattern, "/.*");
let workers = &manifest.fleet["workers"];
assert!(!workers[0].critical);
let jobs = &manifest.fleet["jobs"];
assert!(jobs[0].oneshot);
}
#[test]
fn test_route_shorthand_parsing() {
let toml = r#"
[[fleet.web]]
name = "app"
command = "ruby"
routes = ["http:/.*", "https:/api/.*"]
"#;
let manifest = Manifest::parse(toml).unwrap();
let web = &manifest.fleet["web"];
assert_eq!(web[0].routes.len(), 2);
assert_eq!(web[0].routes[0].bind, "http");
assert_eq!(web[0].routes[0].pattern, "/.*");
assert_eq!(web[0].routes[1].bind, "https");
assert_eq!(web[0].routes[1].pattern, "/api/.*");
}
#[test]
fn test_route_requires_bind_prefix() {
let toml = r#"
[[fleet.web]]
name = "app"
command = "ruby"
routes = ["/.*"]
"#;
let result = Manifest::parse(toml);
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("missing bind prefix")
);
}
#[test]
fn test_manifest_all_ships() {
let toml = r#"
[[fleet.web]]
name = "app"
command = "ruby"
[[fleet.workers]]
name = "sidekiq"
command = "bundle"
"#;
let manifest = Manifest::parse(toml).unwrap();
let all = manifest.all_ships();
assert_eq!(all.len(), 2);
}
#[test]
fn test_manifest_empty_binds() {
let toml = r#"
[mothership]
metrics_port = 9090
"#;
let manifest = Manifest::parse(toml).unwrap();
assert_eq!(manifest.ship_count(), 0);
assert!(manifest.mothership.bind.is_empty());
}
#[test]
fn test_manifest_minimal() {
let toml = "";
let manifest = Manifest::parse(toml).unwrap();
assert_eq!(manifest.ship_count(), 0);
assert!(manifest.mothership.bind.is_empty());
}
#[test]
fn test_base_ship_inheritance() {
let toml = r#"
[base.ship]
env = { RAILS_ENV = "production", LOG_LEVEL = "info" }
critical = false
tags = ["ruby", "rails"]
comment = "Default Rails ship"
[[fleet.web]]
name = "app"
command = "ruby"
env = { PORT = "3000" } # Override/extend env
tags = ["web"] # Additional tag
[[fleet.web]]
name = "api"
command = "ruby"
critical = true # Override critical
comment = "API server" # Override comment
"#;
let manifest = Manifest::parse(toml).unwrap();
let web = &manifest.fleet["web"];
let app = &web[0];
assert_eq!(app.env.get("RAILS_ENV"), Some(&"production".to_string()));
assert_eq!(app.env.get("LOG_LEVEL"), Some(&"info".to_string()));
assert_eq!(app.env.get("PORT"), Some(&"3000".to_string()));
assert!(!app.critical); assert!(app.tags.contains(&"ruby".to_string()));
assert!(app.tags.contains(&"rails".to_string()));
assert!(app.tags.contains(&"web".to_string()));
assert_eq!(app.comment, Some("Default Rails ship".to_string()));
let api = &web[1];
assert!(api.critical); assert_eq!(api.comment, Some("API server".to_string())); assert!(api.tags.contains(&"ruby".to_string())); }
#[test]
fn test_base_module_inheritance() {
let toml = r#"
[base.module]
phase = "request"
tags = ["security"]
config = { timeout = "30" }
comment = "Default security module"
[[modules]]
name = "auth"
wasm = "auth.wasm"
config = { secret = "xyz" }
[[modules]]
name = "rate_limit"
wasm = "rate_limit.wasm"
phase = "response" # Override
tags = ["performance"]
"#;
let manifest = Manifest::parse(toml).unwrap();
let auth = &manifest.modules[0];
assert_eq!(auth.phase, Phase::Request);
assert_eq!(auth.config.get("timeout"), Some(&"30".to_string()));
assert_eq!(auth.config.get("secret"), Some(&"xyz".to_string()));
assert!(auth.tags.contains(&"security".to_string()));
assert_eq!(auth.comment, Some("Default security module".to_string()));
let rate_limit = &manifest.modules[1];
assert_eq!(rate_limit.phase, Phase::Response); assert!(rate_limit.tags.contains(&"security".to_string())); assert!(rate_limit.tags.contains(&"performance".to_string())); }
#[test]
fn test_bays_parsing() {
let toml = r#"
[[bays.websocket]]
name = "orbitcast"
command = "./orbitcast"
args = ["--port", "9000"]
routes = ["ws:/cable"]
[[bays.grpc]]
name = "rpc-gateway"
command = "./grpc-gateway"
routes = ["http:/grpc/.*"]
"#;
let manifest = Manifest::parse(toml).unwrap();
assert_eq!(manifest.bays.len(), 2);
assert!(manifest.bays.contains_key("websocket"));
assert!(manifest.bays.contains_key("grpc"));
let ws_bays = &manifest.bays["websocket"];
assert_eq!(ws_bays.len(), 1);
assert_eq!(ws_bays[0].name, "orbitcast");
assert_eq!(ws_bays[0].bay_type, "websocket");
assert_eq!(ws_bays[0].args, vec!["--port", "9000"]);
assert!(ws_bays[0].critical);
let grpc_bays = &manifest.bays["grpc"];
assert_eq!(grpc_bays.len(), 1);
assert_eq!(grpc_bays[0].name, "rpc-gateway");
assert_eq!(grpc_bays[0].bay_type, "grpc");
let all = manifest.all_bays();
assert_eq!(all.len(), 2);
assert_eq!(manifest.bay_count(), 2);
}
#[test]
fn test_bays_base_inheritance() {
let toml = r#"
[base.bay]
env = { RUST_LOG = "debug" }
critical = false
tags = ["docked"]
comment = "Default bay settings"
[[bays.websocket]]
name = "orbitcast"
command = "./orbitcast"
env = { PORT = "9000" }
tags = ["realtime"]
[[bays.websocket]]
name = "actioncable"
command = "./cable"
critical = true # Override
"#;
let manifest = Manifest::parse(toml).unwrap();
let ws_bays = &manifest.bays["websocket"];
let orbitcast = &ws_bays[0];
assert_eq!(orbitcast.env.get("RUST_LOG"), Some(&"debug".to_string()));
assert_eq!(orbitcast.env.get("PORT"), Some(&"9000".to_string()));
assert!(!orbitcast.critical); assert!(orbitcast.tags.contains(&"docked".to_string()));
assert!(orbitcast.tags.contains(&"realtime".to_string()));
assert_eq!(orbitcast.comment, Some("Default bay settings".to_string()));
let actioncable = &ws_bays[1];
assert!(actioncable.critical); }
#[test]
fn test_bay_name_conflicts_with_ship() {
let toml = r#"
[[fleet.web]]
name = "app"
command = "ruby"
[[bays.websocket]]
name = "app"
command = "./orbitcast"
"#;
let result = Manifest::parse(toml);
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("Duplicate vessel name")
);
}
#[test]
fn test_duplicate_bay_names() {
let toml = r#"
[[bays.websocket]]
name = "orbitcast"
command = "./orbitcast"
[[bays.grpc]]
name = "orbitcast"
command = "./grpc"
"#;
let result = Manifest::parse(toml);
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("Duplicate vessel name")
);
}
#[test]
fn test_vessels_declaration_order() {
let toml = r#"
[[fleet.web]]
name = "api"
command = "ruby"
routes = ["http:/api/.*"]
[[bays.websocket]]
name = "orbitcast"
command = "./orbitcast"
routes = ["ws:/cable"]
[[fleet.web]]
name = "catchall"
command = "ruby"
routes = ["http:/.*"]
"#;
let manifest = Manifest::parse(toml).unwrap();
assert_eq!(manifest.vessels.len(), 3);
assert_eq!(manifest.vessels[0].name(), "api");
assert!(manifest.vessels[0].is_ship());
assert_eq!(manifest.vessels[1].name(), "orbitcast");
assert!(manifest.vessels[1].is_bay());
assert_eq!(manifest.vessels[2].name(), "catchall");
assert!(manifest.vessels[2].is_ship());
}
#[test]
fn test_flagship_static_env_var() {
let toml = r#"
[mothership.flagship]
enabled = true
election = "static"
static_flagship = "$MS_FLAGSHIP"
"#;
let manifest = Manifest::parse(toml).unwrap();
assert!(manifest.mothership.flagship.enabled);
assert_eq!(manifest.mothership.flagship.election, "static");
match &manifest.mothership.flagship.static_flagship {
StaticFlagship::EnvVar(var) => assert_eq!(var, "MS_FLAGSHIP"),
_ => panic!("Expected EnvVar"),
}
}
#[test]
fn test_flagship_static_command() {
let toml = r#"
[mothership.flagship]
enabled = true
election = "static"
static_flagship = { command = "hostname", equals = "server-a" }
"#;
let manifest = Manifest::parse(toml).unwrap();
match &manifest.mothership.flagship.static_flagship {
StaticFlagship::Command { command, equals } => {
assert_eq!(command, "hostname");
assert_eq!(equals, "server-a");
}
_ => panic!("Expected Command"),
}
}
#[test]
fn test_flagship_postgres_requires_url() {
let toml = r#"
[mothership.flagship]
enabled = true
election = "postgres"
"#;
let result = Manifest::parse(toml);
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("election_url is required")
);
}
#[test]
fn test_flagship_postgres_url_must_be_postgres() {
let toml = r#"
[mothership.flagship]
enabled = true
election = "postgres"
election_url = "redis://localhost:6379"
"#;
let result = Manifest::parse(toml);
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("must be a PostgreSQL URL")
);
}
#[test]
fn test_flagship_postgres_valid() {
let toml = r#"
[mothership.flagship]
enabled = true
election = "postgres"
election_url = "postgres://localhost:5432/myapp"
"#;
let manifest = Manifest::parse(toml).unwrap();
assert!(manifest.mothership.flagship.enabled);
assert_eq!(manifest.mothership.flagship.election, "postgres");
assert_eq!(
manifest.mothership.flagship.election_url,
Some("postgres://localhost:5432/myapp".to_string())
);
}
#[test]
fn test_flagship_postgres_env_expansion() {
unsafe {
std::env::set_var("TEST_DB_HOST", "db.example.com");
std::env::set_var("TEST_DB_USER", "admin");
}
let toml = r#"
[mothership.flagship]
enabled = true
election = "postgres"
election_url = "postgres://$TEST_DB_USER@$TEST_DB_HOST:5432/myapp"
"#;
let manifest = Manifest::parse(toml).unwrap();
let expanded = manifest
.mothership
.flagship
.expanded_election_url()
.unwrap()
.unwrap();
assert_eq!(expanded, "postgres://admin@db.example.com:5432/myapp");
unsafe {
std::env::remove_var("TEST_DB_HOST");
std::env::remove_var("TEST_DB_USER");
}
}
#[test]
fn test_flagship_invalid_election() {
let toml = r#"
[mothership.flagship]
enabled = true
election = "redis"
"#;
let result = Manifest::parse(toml);
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("must be \"static\" or \"postgres\"")
);
}
#[test]
fn test_flagship_defaults() {
let toml = r#"
[mothership.flagship]
enabled = true
"#;
let manifest = Manifest::parse(toml).unwrap();
assert_eq!(manifest.mothership.flagship.election, "static");
assert_eq!(manifest.mothership.flagship.election_timeout, 30);
assert_eq!(manifest.mothership.flagship.prelaunch_timeout, 300);
assert_eq!(manifest.mothership.flagship.heartbeat_interval, 5);
}
}