greentic-operator 0.4.43

Greentic operator CLI for local dev and demo orchestration.
Documentation
use std::time::Duration;

use anyhow::Result;
use chrono::Utc;

use crate::operator_log;
use crate::subscriptions_universal::service::{
    ProviderRunner, SubscriptionDeleteRequest, SubscriptionEnsureRequest, SubscriptionRenewRequest,
    SubscriptionService,
};
use crate::subscriptions_universal::store::{SubscriptionState, SubscriptionStore};

const DEFAULT_RENEW_EXTENSION_MS: u64 = 86_400_000;

pub struct Scheduler<R: ProviderRunner> {
    service: SubscriptionService<R>,
    store: SubscriptionStore,
}

impl<R: ProviderRunner> Scheduler<R> {
    pub fn new(service: SubscriptionService<R>, store: SubscriptionStore) -> Self {
        Self { service, store }
    }

    pub fn ensure_once(&self, provider: &str, request: &SubscriptionEnsureRequest) -> Result<()> {
        let state = self.service.ensure_once(provider, request)?;
        self.store.write_state(&state)
    }

    pub fn renew_due(&self, skew: Duration) -> Result<()> {
        let now = Utc::now().timestamp_millis();
        let skew_ms = skew.as_millis() as i64;
        let states = self.store.list_states()?;
        for state in states {
            if let Some(expiration) = state.expiration_unix_ms {
                let renew_at = expiration.saturating_sub(skew_ms);
                if now >= renew_at
                    && let Err(err) = self.renew_binding(&state)
                {
                    operator_log::error(
                        module_path!(),
                        format!(
                            "subscription renew failed binding={} provider={} err={}",
                            state.binding_id, state.provider, err
                        ),
                    );
                }
            }
        }
        Ok(())
    }

    pub fn renew_binding(&self, state: &SubscriptionState) -> Result<()> {
        let request = SubscriptionRenewRequest {
            binding_id: state.binding_id.clone(),
            subscription_id: state.subscription_id.clone(),
            user: state.user.clone(),
            resource: state.resource.clone(),
            change_types: state.change_types.clone(),
            expiration_target_unix_ms: Some(next_expiration_target(state)),
        };
        let renewed = self.service.renew_once(&state.provider, &request)?;
        self.store.write_state(&renewed)
    }

    pub fn delete_binding(&self, state: &SubscriptionState) -> Result<()> {
        let request = SubscriptionDeleteRequest {
            binding_id: state.binding_id.clone(),
            subscription_id: state.subscription_id.clone(),
            user: state.user.clone(),
        };
        self.service.delete_once(&state.provider, &request)?;
        self.store.delete_state(state)
    }
}

fn next_expiration_target(state: &SubscriptionState) -> u64 {
    let now_ms = Utc::now().timestamp_millis();
    let base = state
        .expiration_unix_ms
        .filter(|ms| *ms > 0)
        .map(|ms| ms as u64)
        .unwrap_or_else(|| now_ms as u64);
    base + DEFAULT_RENEW_EXTENSION_MS
}