use crate::core::error::Result;
use crate::core::remote::CloudProvider;
use crate::core::resources::ResourceSpec;
use crate::deployment::tracker::{DeploymentTracker, DeploymentType};
use blueprint_core::{error, info, warn};
use blueprint_std::{collections::HashMap, sync::Arc};
use chrono::{DateTime, Utc};
use tokio::sync::RwLock;
type TtlRegistry = Arc<RwLock<HashMap<(u64, u64), DateTime<Utc>>>>;
#[derive(Debug, Clone)]
pub struct RemoteDeploymentConfig {
pub deployment_type: DeploymentType,
pub provider: Option<CloudProvider>,
pub region: Option<String>,
pub instance_id: String,
pub resource_spec: ResourceSpec,
pub ttl_seconds: Option<u64>,
pub deployed_at: DateTime<Utc>,
}
pub struct RemoteDeploymentRegistry {
deployments: Arc<RwLock<HashMap<(u64, u64), RemoteDeploymentConfig>>>,
tracker: Arc<DeploymentTracker>,
}
impl RemoteDeploymentRegistry {
pub fn new(tracker: Arc<DeploymentTracker>) -> Self {
Self {
deployments: Arc::new(RwLock::new(HashMap::new())),
tracker,
}
}
pub async fn register(
&self,
blueprint_id: u64,
service_id: u64,
config: RemoteDeploymentConfig,
) {
let mut deployments = self.deployments.write().await;
deployments.insert((blueprint_id, service_id), config);
info!(
"Registered remote deployment for blueprint {} service {}",
blueprint_id, service_id
);
}
pub async fn get(&self, blueprint_id: u64, service_id: u64) -> Option<RemoteDeploymentConfig> {
let deployments = self.deployments.read().await;
deployments.get(&(blueprint_id, service_id)).cloned()
}
pub async fn cleanup(&self, blueprint_id: u64, service_id: u64) -> Result<()> {
let mut deployments = self.deployments.write().await;
if let Some(config) = deployments.remove(&(blueprint_id, service_id)) {
info!(
"Cleaning up remote deployment {} for blueprint {} service {}",
config.instance_id, blueprint_id, service_id
);
if let Err(e) = self.tracker.handle_termination(&config.instance_id).await {
warn!("Failed to cleanup deployment in tracker: {}", e);
}
}
Ok(())
}
}
pub struct TtlManager {
registry: Arc<RemoteDeploymentRegistry>,
ttl_registry: TtlRegistry,
expiry_tx: tokio::sync::mpsc::UnboundedSender<(u64, u64)>,
}
impl TtlManager {
pub fn new(
registry: Arc<RemoteDeploymentRegistry>,
expiry_tx: tokio::sync::mpsc::UnboundedSender<(u64, u64)>,
) -> Self {
Self {
registry,
ttl_registry: Arc::new(RwLock::new(HashMap::new())),
expiry_tx,
}
}
pub async fn register_ttl(&self, blueprint_id: u64, service_id: u64, ttl_seconds: u64) {
let expiry = Utc::now() + chrono::Duration::seconds(ttl_seconds as i64);
let mut registry = self.ttl_registry.write().await;
registry.insert((blueprint_id, service_id), expiry);
info!(
"Registered TTL for blueprint {} service {}: expires at {}",
blueprint_id, service_id, expiry
);
}
pub async fn unregister_ttl(&self, blueprint_id: u64, service_id: u64) {
let mut registry = self.ttl_registry.write().await;
registry.remove(&(blueprint_id, service_id));
info!(
"Unregistered TTL for blueprint {} service {}",
blueprint_id, service_id
);
}
pub async fn check_expired_services(&self) -> Result<Vec<(u64, u64)>> {
let now = Utc::now();
let registry = self.ttl_registry.read().await;
let expired: Vec<(u64, u64)> = registry
.iter()
.filter(|(_, expiry)| now >= **expiry)
.map(|(id, _)| *id)
.collect();
drop(registry);
let mut cleaned = Vec::new();
for (blueprint_id, service_id) in expired {
info!(
"TTL expired for blueprint {} service {}",
blueprint_id, service_id
);
if let Some(deployment_config) = self.registry.get(blueprint_id, service_id).await {
info!(
"Cleaning up expired deployment: {} (provider: {:?})",
deployment_config.instance_id, deployment_config.provider
);
if let Err(e) = self.registry.cleanup(blueprint_id, service_id).await {
warn!("Failed to cleanup deployment from registry: {}", e);
}
}
if self.expiry_tx.send((blueprint_id, service_id)).is_ok() {
cleaned.push((blueprint_id, service_id));
let mut registry = self.ttl_registry.write().await;
registry.remove(&(blueprint_id, service_id));
}
}
Ok(cleaned)
}
pub async fn get_active_ttl_count(&self) -> usize {
let ttl_registry = self.ttl_registry.read().await;
let mut active_count = 0;
for (blueprint_id, service_id) in ttl_registry.keys() {
if self
.registry
.get(*blueprint_id, *service_id)
.await
.is_some()
{
active_count += 1;
}
}
active_count
}
pub async fn sync_with_deployment_registry(&self) -> Result<usize> {
let ttl_entries: Vec<(u64, u64)> = {
let ttl_registry = self.ttl_registry.read().await;
ttl_registry.keys().cloned().collect()
};
let mut orphaned_count = 0;
for (blueprint_id, service_id) in ttl_entries {
if self.registry.get(blueprint_id, service_id).await.is_none() {
info!(
"Removing orphaned TTL entry for blueprint {} service {}",
blueprint_id, service_id
);
let mut ttl_registry = self.ttl_registry.write().await;
ttl_registry.remove(&(blueprint_id, service_id));
orphaned_count += 1;
}
}
Ok(orphaned_count)
}
}
pub async fn handle_service_shutdown(
blueprint_id: u64,
service_id: u64,
registry: &RemoteDeploymentRegistry,
) -> Result<()> {
if let Some(config) = registry.get(blueprint_id, service_id).await {
info!(
"Performing remote cleanup for deployment {}",
config.instance_id
);
registry.cleanup(blueprint_id, service_id).await?;
}
Ok(())
}
pub struct RemoteEventHandler {
registry: Arc<RemoteDeploymentRegistry>,
ttl_manager: Option<Arc<TtlManager>>,
}
impl RemoteEventHandler {
pub fn new(registry: Arc<RemoteDeploymentRegistry>) -> Self {
Self {
registry,
ttl_manager: None,
}
}
pub fn with_ttl_manager(mut self, ttl_manager: Arc<TtlManager>) -> Self {
self.ttl_manager = Some(ttl_manager);
self
}
pub async fn on_service_initiated(
&self,
blueprint_id: u64,
service_id: u64,
config: Option<RemoteDeploymentConfig>,
) -> Result<()> {
if let Some(config) = config {
self.registry
.register(blueprint_id, service_id, config.clone())
.await;
if let Some(ttl_seconds) = config.ttl_seconds {
if let Some(ttl_manager) = &self.ttl_manager {
ttl_manager
.register_ttl(blueprint_id, service_id, ttl_seconds)
.await;
}
}
}
Ok(())
}
pub async fn on_service_terminated(&self, blueprint_id: u64, service_id: u64) -> Result<()> {
handle_service_shutdown(blueprint_id, service_id, &self.registry).await
}
pub async fn on_ttl_expired(&self, blueprint_id: u64, service_id: u64) -> Result<()> {
info!(
"Handling TTL expiry for blueprint {} service {}",
blueprint_id, service_id
);
self.on_service_terminated(blueprint_id, service_id).await
}
}
pub async fn ttl_checking_task(
ttl_manager: Arc<TtlManager>,
check_interval: blueprint_std::time::Duration,
) {
let mut interval = tokio::time::interval(check_interval);
loop {
interval.tick().await;
match ttl_manager.check_expired_services().await {
Ok(expired) if !expired.is_empty() => {
info!("Found {} services with expired TTL", expired.len());
}
Err(e) => {
error!("TTL check failed: {}", e);
}
_ => {}
}
}
}
pub struct RemoteSourceExtension {
registry: Arc<RemoteDeploymentRegistry>,
provisioner: Arc<crate::infra::CloudProvisioner>,
}
impl RemoteSourceExtension {
pub fn new(
registry: Arc<RemoteDeploymentRegistry>,
provisioner: Arc<crate::infra::CloudProvisioner>,
) -> Self {
Self {
registry,
provisioner,
}
}
pub async fn spawn_remote(
&self,
blueprint_id: u64,
service_id: u64,
resource_spec: ResourceSpec,
provider: CloudProvider,
region: String,
ttl_seconds: Option<u64>,
) -> Result<RemoteDeploymentConfig> {
let _config = crate::providers::common::ProvisioningConfig {
name: format!("{blueprint_id}-{service_id}"),
region: region.clone(),
..Default::default()
};
let instance = self
.provisioner
.provision(CloudProvider::AWS, &resource_spec, "default")
.await?;
let config = RemoteDeploymentConfig {
deployment_type: deployment_type_from_provider(&provider),
provider: Some(provider),
region: Some(region),
instance_id: instance.id,
resource_spec,
ttl_seconds,
deployed_at: Utc::now(),
};
self.registry
.register(blueprint_id, service_id, config.clone())
.await;
Ok(config)
}
}
fn deployment_type_from_provider(provider: &CloudProvider) -> DeploymentType {
match provider {
CloudProvider::AWS => DeploymentType::AwsEc2,
CloudProvider::GCP => DeploymentType::GcpGce,
CloudProvider::Azure => DeploymentType::AzureVm,
CloudProvider::DigitalOcean => DeploymentType::DigitalOceanDroplet,
CloudProvider::Vultr => DeploymentType::VultrInstance,
_ => DeploymentType::SshRemote,
}
}
pub struct RemoteDeploymentExtensions {
pub registry: Arc<RemoteDeploymentRegistry>,
pub event_handler: Arc<RemoteEventHandler>,
pub ttl_manager: Option<Arc<TtlManager>>,
pub source_extension: Arc<RemoteSourceExtension>,
}
impl RemoteDeploymentExtensions {
pub async fn initialize(
state_dir: &std::path::Path,
enable_ttl: bool,
provisioner: Arc<crate::infra::CloudProvisioner>,
) -> Result<Self> {
let tracker = Arc::new(DeploymentTracker::new(state_dir).await?);
let registry = Arc::new(RemoteDeploymentRegistry::new(tracker.clone()));
let ttl_manager = if enable_ttl {
let (ttl_tx, mut ttl_rx) = tokio::sync::mpsc::unbounded_channel();
let ttl_manager = Arc::new(TtlManager::new(registry.clone(), ttl_tx));
let ttl_manager_clone = ttl_manager.clone();
tokio::spawn(async move {
ttl_checking_task(
ttl_manager_clone,
blueprint_std::time::Duration::from_secs(60),
)
.await;
});
let registry_clone = registry.clone();
tokio::spawn(async move {
while let Some((blueprint_id, service_id)) = ttl_rx.recv().await {
if let Err(e) =
handle_service_shutdown(blueprint_id, service_id, ®istry_clone).await
{
error!("Failed to handle TTL expiry: {}", e);
}
}
});
Some(ttl_manager)
} else {
None
};
let mut event_handler = RemoteEventHandler::new(registry.clone());
if let Some(ttl_mgr) = &ttl_manager {
event_handler = event_handler.with_ttl_manager(ttl_mgr.clone());
}
let source_extension = Arc::new(RemoteSourceExtension::new(registry.clone(), provisioner));
info!("Initialized remote deployment extensions");
Ok(Self {
registry,
event_handler: Arc::new(event_handler),
ttl_manager,
source_extension,
})
}
pub async fn on_service_removed(&self, blueprint_id: u64, service_id: u64) -> Result<()> {
self.event_handler
.on_service_terminated(blueprint_id, service_id)
.await
}
}
pub struct IntegrationExample;
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn test_remote_registry() {
let temp_dir = TempDir::new().unwrap();
let tracker = Arc::new(DeploymentTracker::new(temp_dir.path()).await.unwrap());
let registry = RemoteDeploymentRegistry::new(tracker);
let config = RemoteDeploymentConfig {
deployment_type: DeploymentType::AwsEc2,
provider: Some(CloudProvider::AWS),
region: Some("us-east-1".to_string()),
instance_id: "i-1234567890".to_string(),
resource_spec: crate::core::resources::ResourceSpec::basic(),
ttl_seconds: Some(3600),
deployed_at: Utc::now(),
};
registry.register(100, 1, config.clone()).await;
let retrieved = registry.get(100, 1).await;
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().instance_id, "i-1234567890");
registry.cleanup(100, 1).await.unwrap();
assert!(registry.get(100, 1).await.is_none());
}
#[tokio::test]
async fn test_ttl_manager() {
let temp_dir = TempDir::new().unwrap();
let tracker = Arc::new(DeploymentTracker::new(temp_dir.path()).await.unwrap());
let registry = Arc::new(RemoteDeploymentRegistry::new(tracker));
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let ttl_manager = TtlManager::new(registry, tx);
ttl_manager.register_ttl(100, 1, 3600).await;
let ttl_registry = ttl_manager.ttl_registry.read().await;
assert!(ttl_registry.contains_key(&(100, 1)));
drop(ttl_registry);
assert!(rx.try_recv().is_err());
}
#[tokio::test]
async fn test_event_handler() {
let temp_dir = TempDir::new().unwrap();
let tracker = Arc::new(DeploymentTracker::new(temp_dir.path()).await.unwrap());
let registry = Arc::new(RemoteDeploymentRegistry::new(tracker));
let event_handler = RemoteEventHandler::new(registry.clone());
let config = RemoteDeploymentConfig {
deployment_type: DeploymentType::GcpGce,
provider: Some(CloudProvider::GCP),
region: Some("us-central1".to_string()),
instance_id: "instance-123".to_string(),
resource_spec: crate::core::resources::ResourceSpec::basic(),
ttl_seconds: None,
deployed_at: Utc::now(),
};
event_handler
.on_service_initiated(200, 2, Some(config))
.await
.unwrap();
assert!(registry.get(200, 2).await.is_some());
event_handler.on_service_terminated(200, 2).await.unwrap();
assert!(registry.get(200, 2).await.is_none());
}
}