use crate::errors::{Error, Result};
use crate::types::InstanceStatus;
use super::{DiscoveryEventHandler, ServiceDiscovery, ServiceInstance};
use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
pub struct PushDiscovery {
_gateway_url: String,
}
impl PushDiscovery {
pub fn new(gateway_url: impl Into<String>) -> Self {
let mut url = gateway_url.into();
if url.ends_with('/') {
url.pop();
}
Self { _gateway_url: url }
}
}
#[async_trait]
impl ServiceDiscovery for PushDiscovery {
async fn discover(&self, _service_name: &str) -> Result<Vec<ServiceInstance>> {
Ok(Vec::new())
}
async fn watch(
&self,
_service_name: &str,
_handler: Box<dyn DiscoveryEventHandler>,
) -> Result<()> {
Ok(())
}
async fn register(&self, _instance: &ServiceInstance) -> Result<()> {
Ok(())
}
async fn deregister(&self, _instance_id: &str) -> Result<()> {
Ok(())
}
async fn report_health(&self, _instance_id: &str, _status: InstanceStatus) -> Result<()> {
Ok(())
}
async fn close(&self) -> Result<()> {
Ok(())
}
async fn health(&self) -> Result<()> {
Ok(())
}
}
pub struct PushHandler {
instances: Arc<RwLock<HashMap<String, PushEntry>>>,
_heartbeat_timeout_secs: u64,
}
struct PushEntry {
instance: ServiceInstance,
last_seen: i64,
}
impl PushHandler {
pub fn new(heartbeat_timeout_secs: u64) -> Self {
Self {
instances: Arc::new(RwLock::new(HashMap::new())),
_heartbeat_timeout_secs: heartbeat_timeout_secs,
}
}
}
#[async_trait]
impl ServiceDiscovery for PushHandler {
async fn discover(&self, service_name: &str) -> Result<Vec<ServiceInstance>> {
let instances = self.instances.read().await;
let result: Vec<ServiceInstance> = instances
.values()
.filter(|e| service_name.is_empty() || e.instance.service_name == service_name)
.map(|e| e.instance.clone())
.collect();
Ok(result)
}
async fn watch(
&self,
_service_name: &str,
_handler: Box<dyn DiscoveryEventHandler>,
) -> Result<()> {
Ok(())
}
async fn register(&self, instance: &ServiceInstance) -> Result<()> {
let mut instances = self.instances.write().await;
instances.insert(
instance.id.clone(),
PushEntry {
instance: instance.clone(),
last_seen: chrono::Utc::now().timestamp(),
},
);
Ok(())
}
async fn deregister(&self, instance_id: &str) -> Result<()> {
let mut instances = self.instances.write().await;
instances.remove(instance_id);
Ok(())
}
async fn report_health(&self, instance_id: &str, status: InstanceStatus) -> Result<()> {
let mut instances = self.instances.write().await;
if let Some(entry) = instances.get_mut(instance_id) {
entry.instance.status = status;
entry.last_seen = chrono::Utc::now().timestamp();
Ok(())
} else {
Err(Error::not_found("service instance", instance_id))
}
}
async fn close(&self) -> Result<()> {
Ok(())
}
async fn health(&self) -> Result<()> {
Ok(())
}
}