#![warn(missing_docs)]
#![doc = include_str!("../README.md")]
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use docker_network::NetworkManager;
use docker_types::DockerError;
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
net::{IpAddr, SocketAddr},
sync::Arc,
};
use tokio::sync::RwLock;
use uuid::Uuid;
pub type Result<T> = std::result::Result<T, DockerError>;
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub enum ServiceStatus {
Running,
Stopped,
Unhealthy,
Starting,
Stopping,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ServiceInstance {
pub id: String,
pub service_id: String,
pub container_id: String,
pub container_name: String,
pub address: SocketAddr,
pub status: ServiceStatus,
pub health_status: bool,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub enum LoadBalancingStrategy {
RoundRobin,
Random,
LeastConnections,
IpHash,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ServiceConfig {
pub name: String,
pub port: u16,
pub network_id: String,
pub load_balancing_strategy: LoadBalancingStrategy,
pub health_check_path: Option<String>,
pub health_check_interval: Option<u64>,
pub labels: HashMap<String, String>,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ServiceInfo {
pub id: String,
pub name: String,
pub port: u16,
pub network_id: String,
pub load_balancing_strategy: LoadBalancingStrategy,
pub health_check_path: Option<String>,
pub health_check_interval: Option<u64>,
pub instances: Vec<ServiceInstance>,
pub labels: HashMap<String, String>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
#[async_trait]
pub trait LoadBalancer: Send + Sync {
async fn select_instance(&self, service_id: &str, client_ip: Option<IpAddr>) -> Result<ServiceInstance>;
async fn update_instance(&self, instance: &ServiceInstance) -> Result<()>;
async fn add_instance(&self, instance: &ServiceInstance) -> Result<()>;
async fn remove_instance(&self, instance_id: &str) -> Result<()>;
}
#[async_trait]
pub trait ServiceDiscovery: Send + Sync {
async fn register_service(&self, config: &ServiceConfig) -> Result<ServiceInfo>;
async fn deregister_service(&self, service_id: &str) -> Result<()>;
async fn register_instance(&self, service_id: &str, instance: &ServiceInstance) -> Result<ServiceInstance>;
async fn deregister_instance(&self, service_id: &str, instance_id: &str) -> Result<()>;
async fn discover_service(&self, service_name: &str) -> Result<ServiceInfo>;
async fn list_services(&self) -> Result<Vec<ServiceInfo>>;
async fn health_check(&self, service_id: &str) -> Result<()>;
async fn load_balance(&self, service_id: &str, client_ip: Option<IpAddr>) -> Result<ServiceInstance>;
}
#[async_trait]
pub trait ServiceManager: Send + Sync {
async fn create_service(&self, config: &ServiceConfig) -> Result<ServiceInfo>;
async fn delete_service(&self, service_id: &str) -> Result<()>;
async fn add_service_instance(
&self,
service_id: &str,
container_id: &str,
container_name: &str,
address: SocketAddr,
) -> Result<ServiceInstance>;
async fn remove_service_instance(&self, service_id: &str, instance_id: &str) -> Result<()>;
async fn list_services(&self) -> Result<Vec<ServiceInfo>>;
async fn inspect_service(&self, service_id: &str) -> Result<ServiceInfo>;
async fn service_to_service_call(
&self,
source_service_id: &str,
target_service_name: &str,
request: Vec<u8>,
) -> Result<Vec<u8>>;
async fn load_balance(&self, service_id: &str, client_ip: Option<IpAddr>) -> Result<ServiceInstance>;
}
pub struct RoundRobinLoadBalancer {
services: Arc<tokio::sync::RwLock<HashMap<String, (usize, Vec<ServiceInstance>)>>>,
}
impl RoundRobinLoadBalancer {
pub fn new() -> Self {
Self { services: Arc::new(RwLock::new(HashMap::new())) }
}
}
#[async_trait]
impl LoadBalancer for RoundRobinLoadBalancer {
async fn select_instance(&self, service_id: &str, _client_ip: Option<IpAddr>) -> Result<ServiceInstance> {
let mut services = self.services.write().await;
if let Some((index, instances)) = services.get_mut(service_id) {
let healthy_instances: Vec<&ServiceInstance> =
instances.iter().filter(|inst| inst.status == ServiceStatus::Running && inst.health_status).collect();
if healthy_instances.is_empty() {
return Err(DockerError::internal("No healthy instances available"));
}
let selected_instance = healthy_instances[*index % healthy_instances.len()].clone();
*index += 1;
Ok(selected_instance)
}
else {
Err(DockerError::not_found("service", service_id.to_string()))
}
}
async fn update_instance(&self, instance: &ServiceInstance) -> Result<()> {
let mut services = self.services.write().await;
if let Some((_, instances)) = services.get_mut(&instance.service_id) {
if let Some(idx) = instances.iter().position(|inst| inst.id == instance.id) {
instances[idx] = instance.clone();
Ok(())
}
else {
Err(DockerError::not_found("instance", instance.id.clone()))
}
}
else {
Err(DockerError::not_found("service", instance.service_id.clone()))
}
}
async fn add_instance(&self, instance: &ServiceInstance) -> Result<()> {
let mut services = self.services.write().await;
services.entry(instance.service_id.clone()).or_insert((0, Vec::new())).1.push(instance.clone());
Ok(())
}
async fn remove_instance(&self, instance_id: &str) -> Result<()> {
let mut services = self.services.write().await;
for (_, (_, instances)) in services.iter_mut() {
if let Some(idx) = instances.iter().position(|inst| inst.id == instance_id) {
instances.remove(idx);
return Ok(());
}
}
Err(DockerError::not_found("instance", instance_id.to_string()))
}
}
pub struct RandomLoadBalancer {
services: Arc<tokio::sync::RwLock<HashMap<String, Vec<ServiceInstance>>>>,
}
impl RandomLoadBalancer {
pub fn new() -> Self {
Self { services: Arc::new(tokio::sync::RwLock::new(HashMap::new())) }
}
}
#[async_trait]
impl LoadBalancer for RandomLoadBalancer {
async fn select_instance(&self, service_id: &str, _client_ip: Option<IpAddr>) -> Result<ServiceInstance> {
let services = self.services.read().await;
if let Some(instances) = services.get(service_id) {
let healthy_instances: Vec<&ServiceInstance> =
instances.iter().filter(|inst| inst.status == ServiceStatus::Running && inst.health_status).collect();
if healthy_instances.is_empty() {
return Err(DockerError::internal("No healthy instances available"));
}
let index = (rand::random::<u32>() as usize) % healthy_instances.len();
let selected_instance = healthy_instances[index].clone();
Ok(selected_instance)
}
else {
Err(DockerError::not_found("service", service_id.to_string()))
}
}
async fn update_instance(&self, instance: &ServiceInstance) -> Result<()> {
let mut services = self.services.write().await;
if let Some(instances) = services.get_mut(&instance.service_id) {
if let Some(idx) = instances.iter().position(|inst| inst.id == instance.id) {
instances[idx] = instance.clone();
Ok(())
}
else {
Err(DockerError::not_found("instance", instance.id.clone()))
}
}
else {
Err(DockerError::not_found("service", instance.service_id.clone()))
}
}
async fn add_instance(&self, instance: &ServiceInstance) -> Result<()> {
let mut services = self.services.write().await;
services.entry(instance.service_id.clone()).or_insert_with(Vec::new).push(instance.clone());
Ok(())
}
async fn remove_instance(&self, instance_id: &str) -> Result<()> {
let mut services = self.services.write().await;
for (_, instances) in services.iter_mut() {
if let Some(idx) = instances.iter().position(|inst| inst.id == instance_id) {
instances.remove(idx);
return Ok(());
}
}
Err(DockerError::not_found("instance", instance_id.to_string()))
}
}
pub struct LeastConnectionsLoadBalancer {
services: Arc<tokio::sync::RwLock<HashMap<String, Vec<ServiceInstance>>>>,
connections: Arc<tokio::sync::RwLock<HashMap<String, usize>>>, }
impl LeastConnectionsLoadBalancer {
pub fn new() -> Self {
Self {
services: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
connections: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
}
}
}
#[async_trait]
impl LoadBalancer for LeastConnectionsLoadBalancer {
async fn select_instance(&self, service_id: &str, _client_ip: Option<IpAddr>) -> Result<ServiceInstance> {
let services = self.services.read().await;
let connections = self.connections.read().await;
if let Some(instances) = services.get(service_id) {
let healthy_instances: Vec<&ServiceInstance> =
instances.iter().filter(|inst| inst.status == ServiceStatus::Running && inst.health_status).collect();
if healthy_instances.is_empty() {
return Err(DockerError::internal("No healthy instances available"));
}
let mut selected_instance = healthy_instances[0];
let mut min_connections = *connections.get(&selected_instance.id).unwrap_or(&0);
for instance in &healthy_instances[1..] {
let conn_count = *connections.get(&instance.id).unwrap_or(&0);
if conn_count < min_connections {
min_connections = conn_count;
selected_instance = instance;
}
}
let mut connections = self.connections.write().await;
*connections.entry(selected_instance.id.clone()).or_insert(0) += 1;
Ok(selected_instance.clone())
}
else {
Err(DockerError::not_found("service", service_id.to_string()))
}
}
async fn update_instance(&self, instance: &ServiceInstance) -> Result<()> {
let mut services = self.services.write().await;
if let Some(instances) = services.get_mut(&instance.service_id) {
if let Some(idx) = instances.iter().position(|inst| inst.id == instance.id) {
instances[idx] = instance.clone();
Ok(())
}
else {
Err(DockerError::not_found("instance", instance.id.clone()))
}
}
else {
Err(DockerError::not_found("service", instance.service_id.clone()))
}
}
async fn add_instance(&self, instance: &ServiceInstance) -> Result<()> {
let mut services = self.services.write().await;
services.entry(instance.service_id.clone()).or_insert_with(Vec::new).push(instance.clone());
Ok(())
}
async fn remove_instance(&self, instance_id: &str) -> Result<()> {
let mut services = self.services.write().await;
let mut connections = self.connections.write().await;
connections.remove(instance_id);
for (_, instances) in services.iter_mut() {
if let Some(idx) = instances.iter().position(|inst| inst.id == instance_id) {
instances.remove(idx);
return Ok(());
}
}
Err(DockerError::not_found("instance", instance_id.to_string()))
}
}
pub async fn decrease_connections(load_balancer: &LeastConnectionsLoadBalancer, instance_id: &str) -> Result<()> {
let mut connections = load_balancer.connections.write().await;
if let Some(count) = connections.get_mut(instance_id) {
if *count > 0 {
*count -= 1;
}
Ok(())
}
else {
Err(DockerError::not_found("instance", instance_id.to_string()))
}
}
pub struct IpHashLoadBalancer {
services: Arc<tokio::sync::RwLock<HashMap<String, Vec<ServiceInstance>>>>,
}
impl IpHashLoadBalancer {
pub fn new() -> Self {
Self { services: Arc::new(tokio::sync::RwLock::new(HashMap::new())) }
}
fn hash_ip(&self, ip: &IpAddr) -> u64 {
use std::{
collections::hash_map::DefaultHasher,
hash::{Hash, Hasher},
};
let mut hasher = DefaultHasher::new();
ip.hash(&mut hasher);
hasher.finish()
}
}
#[async_trait]
impl LoadBalancer for IpHashLoadBalancer {
async fn select_instance(&self, service_id: &str, client_ip: Option<IpAddr>) -> Result<ServiceInstance> {
let services = self.services.read().await;
if let Some(instances) = services.get(service_id) {
let healthy_instances: Vec<&ServiceInstance> =
instances.iter().filter(|inst| inst.status == ServiceStatus::Running && inst.health_status).collect();
if healthy_instances.is_empty() {
return Err(DockerError::internal("No healthy instances available"));
}
let index = if let Some(ip) = client_ip {
let hash = self.hash_ip(&ip);
(hash % healthy_instances.len() as u64) as usize
}
else {
(rand::random::<u32>() as usize) % healthy_instances.len()
};
let selected_instance = healthy_instances[index].clone();
Ok(selected_instance)
}
else {
Err(DockerError::not_found("service", service_id.to_string()))
}
}
async fn update_instance(&self, instance: &ServiceInstance) -> Result<()> {
let mut services = self.services.write().await;
if let Some(instances) = services.get_mut(&instance.service_id) {
if let Some(idx) = instances.iter().position(|inst| inst.id == instance.id) {
instances[idx] = instance.clone();
Ok(())
}
else {
Err(DockerError::not_found("instance", instance.id.clone()))
}
}
else {
Err(DockerError::not_found("service", instance.service_id.clone()))
}
}
async fn add_instance(&self, instance: &ServiceInstance) -> Result<()> {
let mut services = self.services.write().await;
services.entry(instance.service_id.clone()).or_insert_with(Vec::new).push(instance.clone());
Ok(())
}
async fn remove_instance(&self, instance_id: &str) -> Result<()> {
let mut services = self.services.write().await;
for (_, instances) in services.iter_mut() {
if let Some(idx) = instances.iter().position(|inst| inst.id == instance_id) {
instances.remove(idx);
return Ok(());
}
}
Err(DockerError::not_found("instance", instance_id.to_string()))
}
}
pub struct ServiceDiscoveryManager {
services: Arc<tokio::sync::RwLock<HashMap<String, ServiceInfo>>>,
service_name_map: Arc<tokio::sync::RwLock<HashMap<String, String>>>, load_balancer: Arc<dyn LoadBalancer>,
network_manager: Arc<dyn NetworkManager>,
}
impl ServiceDiscoveryManager {
pub fn new(network_manager: Arc<dyn NetworkManager>) -> Self {
Self {
services: Arc::new(RwLock::new(HashMap::new())),
service_name_map: Arc::new(RwLock::new(HashMap::new())),
load_balancer: Arc::new(RoundRobinLoadBalancer::new()),
network_manager,
}
}
pub fn create_load_balancer(strategy: LoadBalancingStrategy) -> Arc<dyn LoadBalancer> {
match strategy {
LoadBalancingStrategy::RoundRobin => Arc::new(RoundRobinLoadBalancer::new()),
LoadBalancingStrategy::Random => Arc::new(RandomLoadBalancer::new()),
LoadBalancingStrategy::LeastConnections => Arc::new(LeastConnectionsLoadBalancer::new()),
LoadBalancingStrategy::IpHash => Arc::new(IpHashLoadBalancer::new()),
}
}
async fn perform_health_check(instance: &ServiceInstance, service_info: &ServiceInfo) -> bool {
if let Some(health_check_path) = &service_info.health_check_path {
Self::perform_http_health_check(instance, health_check_path).await
}
else {
Self::perform_tcp_health_check(instance).await
}
}
async fn perform_http_health_check(instance: &ServiceInstance, health_check_path: &str) -> bool {
let url = format!("http://{}{}", instance.address, health_check_path);
match wae_request::get(&url).send().await {
Ok(response) => response.is_success(),
Err(_) => false,
}
}
async fn perform_tcp_health_check(instance: &ServiceInstance) -> bool {
use tokio::net::TcpStream;
match TcpStream::connect(instance.address).await {
Ok(_) => true,
Err(_) => false,
}
}
}
#[async_trait]
impl ServiceDiscovery for ServiceDiscoveryManager {
async fn register_service(&self, config: &ServiceConfig) -> Result<ServiceInfo> {
let service_id = Uuid::new_v4().to_string();
let now = Utc::now();
let service_info = ServiceInfo {
id: service_id.clone(),
name: config.name.clone(),
port: config.port,
network_id: config.network_id.clone(),
load_balancing_strategy: config.load_balancing_strategy.clone(),
health_check_path: config.health_check_path.clone(),
health_check_interval: config.health_check_interval,
instances: Vec::new(),
labels: config.labels.clone(),
created_at: now,
updated_at: now,
};
let mut services = self.services.write().await;
let mut service_name_map = self.service_name_map.write().await;
services.insert(service_id.clone(), service_info.clone());
service_name_map.insert(config.name.clone(), service_id);
Ok(service_info)
}
async fn deregister_service(&self, service_id: &str) -> Result<()> {
let mut services = self.services.write().await;
let mut service_name_map = self.service_name_map.write().await;
if let Some(service_info) = services.remove(service_id) {
service_name_map.remove(&service_info.name);
Ok(())
}
else {
Err(DockerError::not_found("service", service_id.to_string()))
}
}
async fn register_instance(&self, service_id: &str, instance: &ServiceInstance) -> Result<ServiceInstance> {
let mut services = self.services.write().await;
if let Some(service_info) = services.get_mut(service_id) {
service_info.instances.push(instance.clone());
service_info.updated_at = Utc::now();
self.load_balancer.add_instance(instance).await?;
Ok(instance.clone())
}
else {
Err(DockerError::not_found("service", service_id.to_string()))
}
}
async fn deregister_instance(&self, service_id: &str, instance_id: &str) -> Result<()> {
let mut services = self.services.write().await;
if let Some(service_info) = services.get_mut(service_id) {
if let Some(idx) = service_info.instances.iter().position(|inst| inst.id == instance_id) {
service_info.instances.remove(idx);
service_info.updated_at = Utc::now();
self.load_balancer.remove_instance(instance_id).await?;
Ok(())
}
else {
Err(DockerError::not_found("instance", instance_id.to_string()))
}
}
else {
Err(DockerError::not_found("service", service_id.to_string()))
}
}
async fn discover_service(&self, service_name: &str) -> Result<ServiceInfo> {
let service_name_map = self.service_name_map.read().await;
if let Some(service_id) = service_name_map.get(service_name) {
let services = self.services.read().await;
if let Some(service_info) = services.get(service_id) {
Ok(service_info.clone())
}
else {
Err(DockerError::not_found("service", service_id.to_string()))
}
}
else {
Err(DockerError::not_found("service", service_name.to_string()))
}
}
async fn list_services(&self) -> Result<Vec<ServiceInfo>> {
let services = self.services.read().await;
Ok(services.values().cloned().collect())
}
async fn health_check(&self, service_id: &str) -> Result<()> {
let mut services = self.services.write().await;
if let Some(service_info) = services.get_mut(service_id) {
let service_info_clone = service_info.clone();
for instance in &mut service_info.instances {
let health_status = ServiceDiscoveryManager::perform_health_check(instance, &service_info_clone).await;
instance.health_status = health_status;
instance.updated_at = Utc::now();
self.load_balancer.update_instance(instance).await?;
}
Ok(())
}
else {
Err(DockerError::not_found("service", service_id.to_string()))
}
}
async fn load_balance(&self, service_id: &str, client_ip: Option<IpAddr>) -> Result<ServiceInstance> {
self.load_balancer.select_instance(service_id, client_ip).await
}
}
pub struct ServiceManagerImpl {
service_discovery: Arc<dyn ServiceDiscovery>,
network_manager: Arc<dyn NetworkManager>,
}
impl ServiceManagerImpl {
pub fn new(network_manager: Arc<dyn NetworkManager>) -> Self {
let service_discovery = Arc::new(ServiceDiscoveryManager::new(network_manager.clone()));
Self { service_discovery, network_manager }
}
async fn perform_http_service_call(instance: &ServiceInstance, request: Vec<u8>) -> Result<Vec<u8>> {
let url = format!("http://{}/", instance.address);
let response = wae_request::post(&url)
.body(request)
.send()
.await
.map_err(|e| DockerError::internal(format!("HTTP service call failed: {}", e)))?;
Ok(response.body)
}
async fn perform_tcp_service_call(instance: &ServiceInstance, request: Vec<u8>) -> Result<Vec<u8>> {
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
};
let mut stream = TcpStream::connect(instance.address)
.await
.map_err(|e| DockerError::internal(format!("TCP connection failed: {}", e)))?;
stream.write_all(&request).await.map_err(|e| DockerError::internal(format!("Failed to send TCP request: {}", e)))?;
let mut response = Vec::new();
stream
.read_to_end(&mut response)
.await
.map_err(|e| DockerError::internal(format!("Failed to read TCP response: {}", e)))?;
Ok(response)
}
}
#[async_trait]
impl ServiceManager for ServiceManagerImpl {
async fn create_service(&self, config: &ServiceConfig) -> Result<ServiceInfo> {
self.service_discovery.register_service(config).await
}
async fn delete_service(&self, service_id: &str) -> Result<()> {
self.service_discovery.deregister_service(service_id).await
}
async fn add_service_instance(
&self,
service_id: &str,
container_id: &str,
container_name: &str,
address: SocketAddr,
) -> Result<ServiceInstance> {
let instance = ServiceInstance {
id: Uuid::new_v4().to_string(),
service_id: service_id.to_string(),
container_id: container_id.to_string(),
container_name: container_name.to_string(),
address,
status: ServiceStatus::Running,
health_status: true,
created_at: Utc::now(),
updated_at: Utc::now(),
};
self.service_discovery.register_instance(service_id, &instance).await
}
async fn remove_service_instance(&self, service_id: &str, instance_id: &str) -> Result<()> {
self.service_discovery.deregister_instance(service_id, instance_id).await
}
async fn list_services(&self) -> Result<Vec<ServiceInfo>> {
self.service_discovery.list_services().await
}
async fn inspect_service(&self, service_id: &str) -> Result<ServiceInfo> {
let services = self.service_discovery.list_services().await?;
if let Some(service_info) = services.into_iter().find(|s| s.id == service_id) {
Ok(service_info)
}
else {
Err(DockerError::not_found("service", service_id.to_string()))
}
}
async fn service_to_service_call(
&self,
_source_service_id: &str,
target_service_name: &str,
request: Vec<u8>,
) -> Result<Vec<u8>> {
let target_service = self.service_discovery.discover_service(target_service_name).await?;
let max_retries = 3;
let mut last_error: Option<DockerError> = None;
for attempt in 0..max_retries {
let instance = match self.service_discovery.load_balance(&target_service.id, None).await {
Ok(instance) => instance,
Err(e) => {
last_error = Some(e);
tokio::time::sleep(tokio::time::Duration::from_millis(100 * (attempt + 1) as u64)).await;
continue;
}
};
match Self::perform_http_service_call(&instance, request.clone()).await {
Ok(response) => return Ok(response),
Err(e) => {
last_error = Some(e);
tokio::time::sleep(tokio::time::Duration::from_millis(100 * (attempt + 1) as u64)).await;
continue;
}
};
}
Err(last_error.unwrap_or_else(|| DockerError::internal("Service call failed after multiple attempts")))
}
async fn load_balance(&self, service_id: &str, client_ip: Option<IpAddr>) -> Result<ServiceInstance> {
self.service_discovery.load_balance(service_id, client_ip).await
}
}
pub fn new_service_manager(network_manager: Arc<dyn NetworkManager>) -> Box<dyn ServiceManager> {
Box::new(ServiceManagerImpl::new(network_manager))
}
pub fn new_service_discovery(network_manager: Arc<dyn NetworkManager>) -> Box<dyn ServiceDiscovery> {
Box::new(ServiceDiscoveryManager::new(network_manager))
}
pub fn new_round_robin_load_balancer() -> Box<dyn LoadBalancer> {
Box::new(RoundRobinLoadBalancer::new())
}
pub fn new_random_load_balancer() -> Box<dyn LoadBalancer> {
Box::new(RandomLoadBalancer::new())
}