mod contract;
mod handler;
mod parser;
mod transport;
mod validation_types;
#[cfg(not(target_arch = "wasm32"))]
#[doc(hidden)]
pub mod wire;
pub use contract::{
validate_transport_contract_profile, validated_transport_contract_profile,
DocumentedTransportContract, TransportContractProfile, TransportContractTier,
TransportContractViolation, TransportOperationalContract, TransportSemanticContract,
TransportStartupMode,
};
pub use handler::{TopologyHandler, TopologyHandlerBuilder};
pub use parser::{parse_topology, ParsedTopology, TopologyParseError};
pub use transport::{
ByteMessage, InMemoryChannelTransport, Transport, TransportError, TransportFactory,
TransportMessage, TransportResult, TransportType,
};
pub use validation_types::{TopologyError, TopologyLoadError, TopologyValidation};
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::fmt;
use crate::identifiers::{Endpoint as TopologyEndpoint, Region, RoleName};
use crate::ChannelCapacity;
use telltale_types::{
canonical_transport_boundaries, PlacementObservation, TransportBoundaryObservation,
};
#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
pub enum Location {
#[default]
Local,
Remote(TopologyEndpoint),
Colocated(RoleName),
}
impl fmt::Display for Location {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Location::Local => write!(f, "local"),
Location::Remote(endpoint) => write!(f, "{}", endpoint),
Location::Colocated(peer) => write!(f, "colocated({})", peer),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TopologyConstraint {
Colocated(RoleName, RoleName),
Separated(RoleName, RoleName),
Pinned(RoleName, Location),
Region(RoleName, Region),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BranchRequirement {
pub sender: RoleName,
pub receiver: RoleName,
pub label_count: u32,
}
impl BranchRequirement {
pub fn new(sender: RoleName, receiver: RoleName, label_count: u32) -> Self {
Self {
sender,
receiver,
label_count,
}
}
#[must_use]
pub fn required_capacity_bits(&self) -> u32 {
min_capacity_bits(self.label_count)
}
}
fn min_capacity_bits(label_count: u32) -> u32 {
if label_count <= 1 {
return 0;
}
32 - (label_count - 1).leading_zeros()
}
impl fmt::Display for TopologyConstraint {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TopologyConstraint::Colocated(r1, r2) => write!(f, "colocated({}, {})", r1, r2),
TopologyConstraint::Separated(r1, r2) => write!(f, "separated({}, {})", r1, r2),
TopologyConstraint::Pinned(role, loc) => write!(f, "pinned({}, {})", role, loc),
TopologyConstraint::Region(role, region) => write!(f, "region({}, {})", role, region),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub enum TopologyMode {
#[default]
Local,
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct RoleFamilyConstraint {
pub min: u32,
pub max: Option<u32>,
}
impl RoleFamilyConstraint {
pub fn min_only(min: u32) -> Self {
Self { min, max: None }
}
pub fn bounded(min: u32, max: u32) -> Self {
Self {
min,
max: Some(max),
}
}
pub fn validate(&self, count: usize) -> Result<(), RoleFamilyConstraintError> {
let count = u32::try_from(count).map_err(|_| RoleFamilyConstraintError::AboveMaximum {
actual: u32::MAX,
max: self.max.unwrap_or(u32::MAX),
})?;
if count < self.min {
return Err(RoleFamilyConstraintError::BelowMinimum {
actual: count,
min: self.min,
});
}
if let Some(max) = self.max {
if count > max {
return Err(RoleFamilyConstraintError::AboveMaximum { actual: count, max });
}
}
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RoleFamilyConstraintError {
BelowMinimum { actual: u32, min: u32 },
AboveMaximum { actual: u32, max: u32 },
}
impl fmt::Display for RoleFamilyConstraintError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RoleFamilyConstraintError::BelowMinimum { actual, min } => {
write!(
f,
"role family has {} instances, minimum required is {}",
actual, min
)
}
RoleFamilyConstraintError::AboveMaximum { actual, max } => {
write!(
f,
"role family has {} instances, maximum allowed is {}",
actual, max
)
}
}
}
}
impl std::error::Error for RoleFamilyConstraintError {}
#[derive(Debug, Clone, Default)]
pub struct Topology {
pub mode: Option<TopologyMode>,
pub locations: BTreeMap<RoleName, Location>,
pub channel_capacities: BTreeMap<(RoleName, RoleName), ChannelCapacity>,
pub constraints: Vec<TopologyConstraint>,
pub role_constraints: BTreeMap<String, RoleFamilyConstraint>,
}
impl Topology {
fn explicit_region(&self, role: &RoleName) -> Result<Option<Region>, String> {
let mut regions = self
.constraints
.iter()
.filter_map(|constraint| match constraint {
TopologyConstraint::Region(candidate, region) if candidate == role => Some(region),
_ => None,
});
let Some(first) = regions.next() else {
return Ok(None);
};
if let Some(conflict) = regions.find(|region| *region != first) {
return Err(format!(
"role {role} has conflicting region constraints ({first} vs {conflict})"
));
}
Ok(Some(first.clone()))
}
fn resolved_location(
&self,
role: &RoleName,
visiting: &mut BTreeSet<RoleName>,
) -> Result<Location, String> {
if !visiting.insert(role.clone()) {
return Err(format!("cyclic colocated placement involving role {role}"));
}
let resolved = match self.get_location(role) {
Ok(Location::Colocated(peer)) => self.resolved_location(&peer, visiting),
Ok(location) => Ok(location),
Err(TopologyError::UnknownRole(missing)) => Err(format!(
"role {role} refers to unknown colocated peer {missing}"
)),
};
visiting.remove(role);
resolved
}
fn resolved_region(
&self,
role: &RoleName,
visiting: &mut BTreeSet<RoleName>,
) -> Result<Option<Region>, String> {
if !visiting.insert(role.clone()) {
return Err(format!("cyclic colocated placement involving role {role}"));
}
let explicit = self.explicit_region(role)?;
let inherited = match self.get_location(role) {
Ok(Location::Colocated(peer)) => self.resolved_region(&peer, visiting)?,
Ok(Location::Local | Location::Remote(_)) => None,
Err(TopologyError::UnknownRole(missing)) => {
visiting.remove(role);
return Err(format!(
"role {role} refers to unknown colocated peer {missing}"
));
}
};
visiting.remove(role);
match (explicit, inherited) {
(Some(explicit), Some(inherited)) if explicit != inherited => Err(format!(
"role {role} declares region {explicit} but colocated peer resolves to {inherited}"
)),
(Some(explicit), _) => Ok(Some(explicit)),
(None, inherited) => Ok(inherited),
}
}
pub fn region_for_role(&self, role: &RoleName) -> Result<Option<Region>, String> {
self.resolved_region(role, &mut BTreeSet::new())
}
pub fn placement_observations_for_roles<I, R>(
&self,
roles: I,
) -> Result<Vec<PlacementObservation>, String>
where
I: IntoIterator<Item = R>,
R: AsRef<str>,
{
let mut observations = Vec::new();
for role in roles {
let role_name =
RoleName::new(role.as_ref().to_string()).map_err(|err| err.to_string())?;
let region = self
.region_for_role(&role_name)?
.map(|region| region.to_string());
let observation = match self.get_location(&role_name) {
Ok(Location::Local) => PlacementObservation::local(role_name.to_string()),
Ok(Location::Remote(endpoint)) => {
PlacementObservation::remote(role_name.to_string(), endpoint.to_string())
}
Ok(Location::Colocated(peer)) => {
PlacementObservation::colocated(role_name.to_string(), peer.to_string())
}
Err(TopologyError::UnknownRole(_)) => {
return Err(format!(
"placement observation requested unknown role {role_name}"
));
}
};
observations.push(match region {
Some(region) => observation.with_region(region),
None => observation,
});
}
telltale_types::canonicalize_placement_observations(&observations)
}
pub fn transport_boundaries_for_roles<I, R>(
&self,
roles: I,
) -> Result<Vec<TransportBoundaryObservation>, String>
where
I: IntoIterator<Item = R>,
R: AsRef<str>,
{
let observations = self.placement_observations_for_roles(roles)?;
canonical_transport_boundaries(&observations)
}
fn resolve_constraint_location(&self, location: &Location) -> Result<Location, String> {
match location {
Location::Colocated(peer) => self.resolved_location(peer, &mut BTreeSet::new()),
other => Ok(other.clone()),
}
}
fn validate_constraint(&self, constraint: &TopologyConstraint) -> Option<TopologyValidation> {
let resolved = |role: &RoleName| match self.resolved_location(role, &mut BTreeSet::new()) {
Ok(location) => Ok(location),
Err(reason) => Err(TopologyValidation::ConstraintViolation(
constraint.clone(),
reason,
)),
};
let resolve_expected =
|location: &Location| match self.resolve_constraint_location(location) {
Ok(location) => Ok(location),
Err(reason) => Err(TopologyValidation::ConstraintViolation(
constraint.clone(),
reason,
)),
};
match constraint {
TopologyConstraint::Colocated(left, right) => {
let left_location = match resolved(left) {
Ok(location) => location,
Err(validation) => return Some(validation),
};
let right_location = match resolved(right) {
Ok(location) => location,
Err(validation) => return Some(validation),
};
if left_location == right_location {
None
} else {
Some(TopologyValidation::ConstraintViolation(
constraint.clone(),
format!(
"roles {left} and {right} resolve to different locations ({left_location} vs {right_location})"
),
))
}
}
TopologyConstraint::Separated(left, right) => {
let left_location = match resolved(left) {
Ok(location) => location,
Err(validation) => return Some(validation),
};
let right_location = match resolved(right) {
Ok(location) => location,
Err(validation) => return Some(validation),
};
if left_location != right_location {
None
} else {
Some(TopologyValidation::ConstraintViolation(
constraint.clone(),
format!(
"roles {left} and {right} resolve to the same location ({left_location})"
),
))
}
}
TopologyConstraint::Pinned(role, expected) => {
let actual = match resolved(role) {
Ok(location) => location,
Err(validation) => return Some(validation),
};
let expected = match resolve_expected(expected) {
Ok(location) => location,
Err(validation) => return Some(validation),
};
if actual == expected {
None
} else {
Some(TopologyValidation::ConstraintViolation(
constraint.clone(),
format!(
"role {role} resolved to {actual}, expected pinned location {expected}"
),
))
}
}
TopologyConstraint::Region(role, region) => {
match self.resolved_region(role, &mut BTreeSet::new()) {
Ok(Some(actual)) if &actual == region => None,
Ok(Some(actual)) => Some(TopologyValidation::ConstraintViolation(
constraint.clone(),
format!("role {role} resolved to region {actual}, expected {region}"),
)),
Ok(None) => Some(TopologyValidation::ConstraintViolation(
constraint.clone(),
format!("role {role} has no resolved region, expected {region}"),
)),
Err(reason) => Some(TopologyValidation::ConstraintViolation(
constraint.clone(),
reason,
)),
}
}
}
}
pub fn new() -> Self {
Self::default()
}
pub fn local_mode() -> Self {
Topology {
mode: Some(TopologyMode::Local),
..Default::default()
}
}
pub fn builder() -> TopologyBuilder {
TopologyBuilder::new()
}
pub fn with_role(mut self, role: RoleName, location: Location) -> Self {
self.locations.insert(role, location);
self
}
pub fn with_constraint(mut self, constraint: TopologyConstraint) -> Self {
self.constraints.push(constraint);
self
}
pub fn with_channel_capacity(
mut self,
sender: RoleName,
receiver: RoleName,
capacity: ChannelCapacity,
) -> Self {
self.channel_capacities.insert((sender, receiver), capacity);
self
}
pub fn get_location(&self, role: &RoleName) -> Result<Location, TopologyError> {
match &self.mode {
Some(TopologyMode::Local) => Ok(Location::Local),
_ => self
.locations
.get(role)
.cloned()
.ok_or_else(|| TopologyError::UnknownRole(role.clone())),
}
}
pub fn is_local(&self, role: &RoleName) -> Result<bool, TopologyError> {
match self.get_location(role)? {
Location::Local | Location::Colocated(_) => Ok(true),
Location::Remote(_) => Ok(false),
}
}
pub fn roles(&self) -> Vec<&RoleName> {
self.locations.keys().collect()
}
pub fn channel_capacity(
&self,
sender: &RoleName,
receiver: &RoleName,
) -> Option<ChannelCapacity> {
self.channel_capacities
.get(&(sender.clone(), receiver.clone()))
.copied()
}
pub fn valid_for_roles(&self, choreo_roles: &[RoleName]) -> bool {
let topo_roles_ok = self.locations.keys().all(|r| choreo_roles.contains(r));
let capacity_roles_ok = self
.channel_capacities
.keys()
.all(|(s, r)| choreo_roles.contains(s) && choreo_roles.contains(r));
let constraints_ok = self.constraints.iter().all(|c| match c {
TopologyConstraint::Colocated(r1, r2) | TopologyConstraint::Separated(r1, r2) => {
choreo_roles.contains(r1) && choreo_roles.contains(r2)
}
TopologyConstraint::Pinned(r, _) | TopologyConstraint::Region(r, _) => {
choreo_roles.contains(r)
}
});
topo_roles_ok && capacity_roles_ok && constraints_ok
}
pub fn validate(&self, choreo_roles: &[RoleName]) -> TopologyValidation {
for (role, location) in &self.locations {
if !choreo_roles.contains(role) {
return TopologyValidation::UnknownRole(role.clone());
}
if let Location::Colocated(peer) = location {
if !choreo_roles.contains(peer) {
return TopologyValidation::UnknownRole(peer.clone());
}
}
}
if !matches!(self.mode, Some(TopologyMode::Local)) {
for role in choreo_roles {
if !self.locations.contains_key(role) {
return TopologyValidation::MissingRole(role.clone());
}
}
}
for (sender, receiver) in self.channel_capacities.keys() {
if !choreo_roles.contains(sender) {
return TopologyValidation::UnknownRole(sender.clone());
}
if !choreo_roles.contains(receiver) {
return TopologyValidation::UnknownRole(receiver.clone());
}
}
for c in &self.constraints {
match c {
TopologyConstraint::Colocated(r1, r2) | TopologyConstraint::Separated(r1, r2) => {
if !choreo_roles.contains(r1) {
return TopologyValidation::UnknownRole(r1.clone());
}
if !choreo_roles.contains(r2) {
return TopologyValidation::UnknownRole(r2.clone());
}
}
TopologyConstraint::Pinned(r, _) | TopologyConstraint::Region(r, _) => {
if !choreo_roles.contains(r) {
return TopologyValidation::UnknownRole(r.clone());
}
}
}
}
for constraint in &self.constraints {
if let Some(validation) = self.validate_constraint(constraint) {
return validation;
}
}
TopologyValidation::Valid
}
pub fn validate_with_branches(
&self,
choreo_roles: &[RoleName],
branches: &[BranchRequirement],
) -> TopologyValidation {
let base = self.validate(choreo_roles);
if !base.is_valid() {
return base;
}
for branch in branches {
let required_bits = branch.required_capacity_bits();
if required_bits == 0 {
continue;
}
if let Some(available) = self.channel_capacity(&branch.sender, &branch.receiver) {
let available_bits = available.get();
if available_bits < required_bits {
return TopologyValidation::InsufficientCapacity {
sender: branch.sender.clone(),
receiver: branch.receiver.clone(),
required_bits,
available_bits,
};
}
}
}
TopologyValidation::Valid
}
pub fn validate_family(
&self,
family: &str,
count: usize,
) -> Result<(), RoleFamilyConstraintError> {
if let Some(constraint) = self.role_constraints.get(family) {
constraint.validate(count)
} else {
Ok(())
}
}
pub fn get_family_constraint(&self, family: &str) -> Option<&RoleFamilyConstraint> {
self.role_constraints.get(family)
}
pub fn load(path: impl AsRef<std::path::Path>) -> Result<ParsedTopology, TopologyLoadError> {
let content = std::fs::read_to_string(path.as_ref())
.map_err(|e| TopologyLoadError::IoError(e.to_string()))?;
parse_topology(&content).map_err(TopologyLoadError::ParseError)
}
pub fn parse(content: &str) -> Result<ParsedTopology, TopologyLoadError> {
parse_topology(content).map_err(TopologyLoadError::ParseError)
}
}
#[derive(Debug, Clone, Default)]
pub struct TopologyBuilder {
topology: Topology,
}
impl TopologyBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn mode(mut self, mode: TopologyMode) -> Self {
self.topology.mode = Some(mode);
self
}
pub fn local_role(mut self, role: RoleName) -> Self {
self.topology.locations.insert(role, Location::Local);
self
}
pub fn remote_role(mut self, role: RoleName, endpoint: TopologyEndpoint) -> Self {
self.topology
.locations
.insert(role, Location::Remote(endpoint));
self
}
pub fn colocated_role(mut self, role: RoleName, peer: RoleName) -> Self {
self.topology
.locations
.insert(role, Location::Colocated(peer));
self
}
pub fn role(mut self, role: RoleName, location: Location) -> Self {
self.topology.locations.insert(role, location);
self
}
pub fn channel_capacity(
mut self,
sender: RoleName,
receiver: RoleName,
capacity: ChannelCapacity,
) -> Self {
self.topology
.channel_capacities
.insert((sender, receiver), capacity);
self
}
pub fn colocated(mut self, r1: RoleName, r2: RoleName) -> Self {
self.topology
.constraints
.push(TopologyConstraint::Colocated(r1, r2));
self
}
pub fn separated(mut self, r1: RoleName, r2: RoleName) -> Self {
self.topology
.constraints
.push(TopologyConstraint::Separated(r1, r2));
self
}
pub fn pinned(mut self, role: RoleName, location: Location) -> Self {
self.topology
.constraints
.push(TopologyConstraint::Pinned(role, location));
self
}
pub fn region(mut self, role: RoleName, region: Region) -> Self {
self.topology
.constraints
.push(TopologyConstraint::Region(role, region));
self
}
pub fn role_family_constraint(
mut self,
family: impl Into<String>,
constraint: RoleFamilyConstraint,
) -> Self {
self.topology
.role_constraints
.insert(family.into(), constraint);
self
}
pub fn build(self) -> Topology {
self.topology
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_location_display() {
assert_eq!(Location::Local.to_string(), "local");
assert_eq!(
Location::Remote(TopologyEndpoint::new("localhost:8080").unwrap()).to_string(),
"localhost:8080"
);
assert_eq!(
Location::Colocated(RoleName::from_static("Alice")).to_string(),
"colocated(Alice)"
);
}
#[test]
fn test_topology_builder() {
let topology = Topology::builder()
.mode(TopologyMode::Local)
.local_role(RoleName::from_static("Alice"))
.remote_role(
RoleName::from_static("Bob"),
TopologyEndpoint::new("localhost:8080").unwrap(),
)
.colocated(
RoleName::from_static("Alice"),
RoleName::from_static("Carol"),
)
.build();
assert_eq!(topology.mode, Some(TopologyMode::Local));
assert_eq!(topology.locations.len(), 2);
assert!(topology.is_local(&RoleName::from_static("Alice")).unwrap());
}
#[test]
fn test_topology_validation() {
let topology = Topology::builder()
.local_role(RoleName::from_static("Alice"))
.local_role(RoleName::from_static("Bob"))
.build();
let roles = vec![
RoleName::from_static("Alice"),
RoleName::from_static("Bob"),
RoleName::from_static("Carol"),
];
match topology.validate(&roles) {
TopologyValidation::MissingRole(role) => {
assert_eq!(role, RoleName::from_static("Carol"))
}
other => panic!("Expected MissingRole, got {other:?}"),
}
let limited_roles = vec![RoleName::from_static("Alice")];
assert!(!topology.validate(&limited_roles).is_valid());
}
#[test]
fn test_local_mode_allows_implicit_role_coverage() {
let topology = Topology::builder().mode(TopologyMode::Local).build();
let roles = vec![
RoleName::from_static("Alice"),
RoleName::from_static("Bob"),
RoleName::from_static("Carol"),
];
assert!(topology.validate(&roles).is_valid());
}
#[test]
fn test_topology_capacity_validation() {
let topology = Topology::builder()
.local_role(RoleName::from_static("Alice"))
.local_role(RoleName::from_static("Bob"))
.channel_capacity(
RoleName::from_static("Alice"),
RoleName::from_static("Bob"),
ChannelCapacity::try_new(1).expect("test capacity in range"),
)
.build();
let roles = vec![RoleName::from_static("Alice"), RoleName::from_static("Bob")];
let branches = vec![BranchRequirement::new(
RoleName::from_static("Alice"),
RoleName::from_static("Bob"),
3,
)];
match topology.validate_with_branches(&roles, &branches) {
TopologyValidation::InsufficientCapacity {
sender,
receiver,
required_bits,
available_bits,
} => {
assert_eq!(sender, RoleName::from_static("Alice"));
assert_eq!(receiver, RoleName::from_static("Bob"));
assert_eq!(required_bits, 2);
assert_eq!(available_bits, 1);
}
_ => panic!("Expected InsufficientCapacity"),
}
}
#[test]
fn test_topology_capacity_unconstrained() {
let topology = Topology::builder()
.local_role(RoleName::from_static("Alice"))
.local_role(RoleName::from_static("Bob"))
.build();
let roles = vec![RoleName::from_static("Alice"), RoleName::from_static("Bob")];
let branches = vec![BranchRequirement::new(
RoleName::from_static("Alice"),
RoleName::from_static("Bob"),
4,
)];
assert!(topology
.validate_with_branches(&roles, &branches)
.is_valid());
}
#[test]
fn test_local_mode() {
let topology = Topology::local_mode();
assert_eq!(
topology
.get_location(&RoleName::from_static("AnyRole"))
.unwrap(),
Location::Local
);
}
#[test]
fn test_constraint_validation() {
let topology = Topology::builder()
.local_role(RoleName::from_static("Alice"))
.local_role(RoleName::from_static("Bob"))
.colocated(
RoleName::from_static("Alice"),
RoleName::from_static("Unknown"),
)
.build();
let roles = vec![RoleName::from_static("Alice"), RoleName::from_static("Bob")];
match topology.validate(&roles) {
TopologyValidation::UnknownRole(role) => {
assert_eq!(role, RoleName::from_static("Unknown"))
}
_ => panic!("Expected UnknownRole"),
}
}
#[test]
fn test_constraint_validation_enforces_placement_requirements() {
let roles = vec![RoleName::from_static("Alice"), RoleName::from_static("Bob")];
let separated = Topology::builder()
.local_role(RoleName::from_static("Alice"))
.local_role(RoleName::from_static("Bob"))
.separated(RoleName::from_static("Alice"), RoleName::from_static("Bob"))
.build();
match separated.validate(&roles) {
TopologyValidation::ConstraintViolation(
TopologyConstraint::Separated(left, right),
_,
) => {
assert_eq!(left, RoleName::from_static("Alice"));
assert_eq!(right, RoleName::from_static("Bob"));
}
other => panic!("Expected separated placement violation, got {other:?}"),
}
let pinned = Topology::builder()
.remote_role(
RoleName::from_static("Alice"),
TopologyEndpoint::new("localhost:9000").unwrap(),
)
.local_role(RoleName::from_static("Bob"))
.pinned(
RoleName::from_static("Bob"),
Location::Remote(TopologyEndpoint::new("localhost:9001").unwrap()),
)
.build();
match pinned.validate(&roles) {
TopologyValidation::ConstraintViolation(TopologyConstraint::Pinned(role, _), _) => {
assert_eq!(role, RoleName::from_static("Bob"));
}
other => panic!("Expected pinned placement violation, got {other:?}"),
}
let region = Topology::builder()
.local_role(RoleName::from_static("Alice"))
.colocated_role(RoleName::from_static("Bob"), RoleName::from_static("Alice"))
.region(
RoleName::from_static("Alice"),
Region::new("membership").expect("region"),
)
.build();
assert!(region.validate(&roles).is_valid());
assert_eq!(
region
.region_for_role(&RoleName::from_static("Alice"))
.expect("region for Alice"),
Some(Region::new("membership").unwrap())
);
assert_eq!(
region
.region_for_role(&RoleName::from_static("Bob"))
.expect("region inherited by colocated Bob"),
Some(Region::new("membership").unwrap())
);
}
#[test]
fn test_colocated_roles_require_known_peers() {
let topology = Topology::builder()
.colocated_role(RoleName::from_static("Bob"), RoleName::from_static("Carol"))
.build();
let roles = vec![RoleName::from_static("Alice"), RoleName::from_static("Bob")];
match topology.validate(&roles) {
TopologyValidation::UnknownRole(role) => {
assert_eq!(role, RoleName::from_static("Carol"));
}
other => panic!("Expected missing colocated peer role, got {other:?}"),
}
}
#[test]
fn test_conflicting_region_constraints_reject_validation() {
let topology = Topology::builder()
.local_role(RoleName::from_static("Alice"))
.colocated_role(RoleName::from_static("Bob"), RoleName::from_static("Alice"))
.region(
RoleName::from_static("Alice"),
Region::new("membership").expect("region"),
)
.region(
RoleName::from_static("Bob"),
Region::new("archive").expect("region"),
)
.build();
let roles = vec![RoleName::from_static("Alice"), RoleName::from_static("Bob")];
match topology.validate(&roles) {
TopologyValidation::ConstraintViolation(
TopologyConstraint::Region(role, region),
msg,
) => {
assert_eq!(role, RoleName::from_static("Bob"));
assert_eq!(region, Region::new("archive").unwrap());
assert!(msg.contains("colocated peer resolves"));
}
other => panic!("Expected conflicting region violation, got {other:?}"),
}
}
#[test]
fn test_topology_exports_canonical_reconfiguration_placement_artifacts() {
let topology = Topology::builder()
.local_role(RoleName::from_static("Alice"))
.colocated_role(RoleName::from_static("Bob"), RoleName::from_static("Alice"))
.remote_role(
RoleName::from_static("Carol"),
TopologyEndpoint::new("127.0.0.1:19841").unwrap(),
)
.region(
RoleName::from_static("Alice"),
Region::new("eu_central_1").expect("region"),
)
.region(
RoleName::from_static("Carol"),
Region::new("us_east_1").expect("region"),
)
.build();
let placements = topology
.placement_observations_for_roles(["Alice", "Bob", "Carol"])
.expect("placement observations");
assert_eq!(
placements,
vec![
telltale_types::PlacementObservation::local("Alice").with_region("eu_central_1"),
telltale_types::PlacementObservation::colocated("Bob", "Alice")
.with_region("eu_central_1"),
telltale_types::PlacementObservation::remote("Carol", "127.0.0.1:19841")
.with_region("us_east_1"),
]
);
let boundaries = topology
.transport_boundaries_for_roles(["Alice", "Bob", "Carol"])
.expect("transport boundaries");
assert!(
boundaries.iter().any(|boundary| matches!(
boundary.boundary,
telltale_types::TransportBoundaryKind::SharedMemory
)),
"topology should expose colocated shared-memory boundaries"
);
assert!(
boundaries.iter().any(|boundary| matches!(
boundary.boundary,
telltale_types::TransportBoundaryKind::Network
) && boundary.cross_region),
"topology should expose cross-region network boundaries"
);
}
#[test]
fn test_topology_from_str() {
let input = r#"
topology Dev for PingPong {
Alice: localhost:8080
Bob: localhost:8081
}
"#;
let parsed = Topology::parse(input).unwrap();
assert_eq!(parsed.name, "Dev");
assert_eq!(parsed.for_choreography, "PingPong");
assert_eq!(
parsed
.topology
.get_location(&RoleName::from_static("Alice"))
.unwrap(),
Location::Remote(TopologyEndpoint::new("localhost:8080").unwrap())
);
}
#[test]
fn test_topology_from_str_local_mode() {
let input = r#"
topology Test for MyProtocol {
mode: local
}
"#;
let parsed = Topology::parse(input).unwrap();
assert_eq!(parsed.topology.mode, Some(TopologyMode::Local));
assert_eq!(
parsed
.topology
.get_location(&RoleName::from_static("AnyRole"))
.unwrap(),
Location::Local
);
}
#[test]
fn test_topology_load_error() {
let result = Topology::load("nonexistent/file.topology");
assert!(result.is_err());
match result {
Err(TopologyLoadError::IoError(_)) => {}
_ => panic!("Expected IoError"),
}
}
#[test]
fn test_role_family_constraint_min_only() {
let constraint = RoleFamilyConstraint::min_only(3);
assert!(constraint.validate(3).is_ok());
assert!(constraint.validate(5).is_ok());
assert!(constraint.validate(100).is_ok());
assert!(constraint.validate(2).is_err());
assert!(constraint.validate(0).is_err());
}
#[test]
fn test_role_family_constraint_bounded() {
let constraint = RoleFamilyConstraint::bounded(2, 5);
assert!(constraint.validate(2).is_ok());
assert!(constraint.validate(3).is_ok());
assert!(constraint.validate(5).is_ok());
assert!(constraint.validate(1).is_err());
assert!(constraint.validate(6).is_err());
}
#[test]
fn test_role_family_constraint_error_messages() {
let constraint = RoleFamilyConstraint::bounded(3, 10);
let err = constraint.validate(2).unwrap_err();
assert!(err.to_string().contains("minimum required is 3"));
let err = constraint.validate(11).unwrap_err();
assert!(err.to_string().contains("maximum allowed is 10"));
}
#[test]
fn test_topology_validate_family() {
let input = r#"
topology Test for Protocol {
role_constraints {
Witness: min = 3, max = 10
}
}
"#;
let parsed = Topology::parse(input).unwrap();
let topology = parsed.topology;
assert!(topology.validate_family("Witness", 3).is_ok());
assert!(topology.validate_family("Witness", 5).is_ok());
assert!(topology.validate_family("Witness", 10).is_ok());
assert!(topology.validate_family("Witness", 2).is_err());
assert!(topology.validate_family("Witness", 11).is_err());
assert!(topology.validate_family("Unknown", 0).is_ok());
assert!(topology.validate_family("Unknown", 100).is_ok());
}
#[test]
fn test_topology_get_family_constraint() {
let input = r#"
topology Test for Protocol {
role_constraints {
Witness: min = 3
}
}
"#;
let parsed = Topology::parse(input).unwrap();
let topology = parsed.topology;
let constraint = topology.get_family_constraint("Witness");
assert!(constraint.is_some());
assert_eq!(constraint.unwrap().min, 3);
let unknown = topology.get_family_constraint("Unknown");
assert!(unknown.is_none());
}
#[test]
fn test_topology_builder_preserves_role_family_constraints() {
let topology = Topology::builder()
.local_role(RoleName::from_static("Coordinator"))
.role_family_constraint("Witness", RoleFamilyConstraint::bounded(2, 5))
.build();
assert_eq!(
topology.get_family_constraint("Witness"),
Some(&RoleFamilyConstraint::bounded(2, 5))
);
assert!(topology.validate_family("Witness", 2).is_ok());
assert!(topology.validate_family("Witness", 6).is_err());
}
}