use std::fmt;
use std::sync::atomic;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use chrono::{DateTime, Utc};
use crossbeam_utils::atomic::AtomicCell;
use futures_util::pin_mut;
use futures_util::future::{pending, select, Either, Future};
use slab::Slab;
use serde::Deserialize;
use tokio::sync::{mpsc, oneshot};
use crate::{manager, metrics, payload};
use crate::config::Marked;
use crate::metrics::{Metric, MetricType, MetricUnit};
const UPDATE_QUEUE_LEN: usize = 8;
const COMMAND_QUEUE_LEN: usize = 16;
#[derive(Debug)]
pub struct Gate {
commands: mpsc::Receiver<GateCommand>,
updates: Slab<UpdateSender>,
suspended: usize,
unit_status: UnitStatus,
metrics: Arc<GateMetrics>,
}
impl Gate {
pub fn new() -> (Gate, GateAgent) {
let (tx, rx) = mpsc::channel(COMMAND_QUEUE_LEN);
let gate = Gate {
commands: rx,
updates: Slab::new(),
suspended: 0,
unit_status: Default::default(),
metrics: Default::default(),
};
let agent = GateAgent { commands: tx };
(gate, agent)
}
pub fn metrics(&self) -> Arc<GateMetrics> {
self.metrics.clone()
}
pub async fn process(&mut self) -> Result<GateStatus, Terminated> {
let status = self.gate_status();
loop {
let command = match self.commands.recv().await {
Some(command) => command,
None => return Err(Terminated)
};
match command {
GateCommand::Suspension { slot, suspend } => {
self.suspension(slot, suspend)
}
GateCommand::Subscribe { suspended, response } => {
self.subscribe(suspended, response)
}
}
let new_status = self.gate_status();
if new_status != status {
return Ok(new_status)
}
}
}
pub async fn process_until<Fut: Future>(
&mut self,
fut: Fut
) -> Result<Fut::Output, Terminated> {
pin_mut!(fut);
loop {
let process = self.process();
pin_mut!(process);
match select(process, fut).await {
Either::Left((Err(_), _)) => return Err(Terminated),
Either::Left((Ok(_), next_fut)) => {
fut = next_fut;
}
Either::Right((res, _)) => return Ok(res)
}
}
}
pub async fn update(&mut self, update: UnitUpdate) -> bool {
if !self.unit_status.apply(&update) {
return false
}
for (_, item) in &mut self.updates {
if item.suspended {
continue
}
match item.sender.as_mut() {
Some(sender) => {
if sender.send(update.clone()).await.is_ok() {
continue
}
}
None => continue
}
item.sender = None
}
self.updates.retain(|_, item| item.sender.is_some());
self.metrics.update(&self.unit_status);
true
}
pub fn gate_status(&self) -> GateStatus {
if self.suspended == self.updates.len() {
GateStatus::Dormant
}
else {
GateStatus::Active
}
}
fn suspension(&mut self, slot: usize, suspend: bool) {
if let Some(item) = self.updates.get_mut(slot) {
item.suspended = suspend
}
}
fn subscribe(
&mut self,
suspended: bool,
response: oneshot::Sender<SubscribeResponse>
) {
let (tx, receiver) = mpsc::channel(UPDATE_QUEUE_LEN);
let slot = self.updates.insert(UpdateSender {
sender: Some(tx),
suspended,
});
let subscription = SubscribeResponse {
slot,
receiver,
unit_status: self.unit_status.clone(),
};
if let Err(subscription) = response.send(subscription) {
self.updates.remove(subscription.slot);
}
}
}
#[derive(Clone, Debug)]
pub struct GateAgent {
commands: mpsc::Sender<GateCommand>,
}
impl GateAgent {
pub fn create_link(&mut self) -> Link {
Link::new(self.commands.clone())
}
}
#[derive(Debug, Default)]
pub struct GateMetrics {
health: AtomicCell<UnitHealth>,
count: AtomicUsize,
update: AtomicCell<Option<DateTime<Utc>>>,
}
impl GateMetrics {
fn update(&self, status: &UnitStatus) {
if let Some(payload) = status.payload.as_ref() {
self.count.store(
payload.set().len(), atomic::Ordering::Relaxed
);
}
self.update.store(Some(Utc::now()));
self.health.store(status.health)
}
}
impl GateMetrics {
const STATUS_METRIC: Metric = Metric::new(
"unit_status", "the operational status of the unit",
MetricType::Text, MetricUnit::Info
);
const COUNT_METRIC: Metric = Metric::new(
"vrps", "the number of VRPs in the last update",
MetricType::Gauge, MetricUnit::Total
);
const UPDATE_METRIC: Metric = Metric::new(
"last_update", "the date and time of the last update",
MetricType::Text, MetricUnit::Info
);
const UPDATE_AGO_METRIC: Metric = Metric::new(
"since_last_update", "the number of seconds since the last update",
MetricType::Gauge, MetricUnit::Second
);
}
impl metrics::Source for GateMetrics {
fn append(&self, unit_name: &str, target: &mut metrics::Target) {
target.append_simple(
&Self::STATUS_METRIC, Some(unit_name), self.health.load()
);
target.append_simple(
&Self::COUNT_METRIC, Some(unit_name),
self.count.load(atomic::Ordering::Relaxed)
);
match self.update.load() {
Some(update) => {
target.append_simple(
&Self::UPDATE_METRIC, Some(unit_name),
update
);
let ago = Utc::now().signed_duration_since(update);
let ago = (ago.num_milliseconds() as f64) / 1000.;
target.append_simple(
&Self::UPDATE_AGO_METRIC, Some(unit_name), ago
);
}
None => {
target.append_simple(
&Self::UPDATE_METRIC, Some(unit_name),
"N/A"
);
target.append_simple(
&Self::UPDATE_AGO_METRIC, Some(unit_name), -1
);
}
}
}
}
#[derive(Debug, Deserialize)]
#[serde(from = "String")]
pub struct Link {
commands: mpsc::Sender<GateCommand>,
connection: ConnectionStatus,
unit_status: UnitStatus,
suspended: bool,
}
#[derive(Debug)]
enum ConnectionStatus {
Unconnected,
Active(LinkConnection),
Gone
}
#[derive(Debug)]
struct LinkConnection {
slot: usize,
updates: UpdateReceiver,
}
impl Link {
fn new(commands: mpsc::Sender<GateCommand>) -> Self {
Link {
commands,
connection: ConnectionStatus::Unconnected,
unit_status: Default::default(),
suspended: false,
}
}
pub fn health(&self) -> UnitHealth {
self.unit_status.health
}
pub fn payload(&self) -> Option<&payload::Update> {
self.unit_status.payload.as_ref()
}
pub async fn query(&mut self) -> UnitUpdate {
if self.connect().await {
if let Some(update) = self.unit_status.to_update() {
return update
}
}
let conn = match self.connection {
ConnectionStatus::Active(ref mut conn) => conn,
ConnectionStatus::Unconnected | ConnectionStatus::Gone => {
return pending().await
}
};
match conn.updates.recv().await {
Some(update) => {
self.unit_status.apply(&update);
update
}
None => {
self.connection = ConnectionStatus::Gone;
self.unit_status.health = UnitHealth::Gone;
UnitUpdate::Gone
}
}
}
async fn connect(&mut self) -> bool {
if !matches!(self.connection, ConnectionStatus::Unconnected) {
return false
}
let (tx, rx) = oneshot::channel();
if self.commands.send(
GateCommand::Subscribe { suspended: self.suspended, response: tx }
).await.is_err() {
self.connection = ConnectionStatus::Gone;
self.unit_status.health = UnitHealth::Gone;
return true
}
let sub = match rx.await {
Ok(sub) => sub,
Err(_) => {
self.connection = ConnectionStatus::Gone;
self.unit_status.health = UnitHealth::Gone;
return true
}
};
self.connection = ConnectionStatus::Active(LinkConnection {
slot: sub.slot,
updates: sub.receiver,
});
self.unit_status = sub.unit_status;
true
}
pub async fn suspend(&mut self, suspend: bool) {
if self.suspended != suspend {
let conn = match self.connection {
ConnectionStatus::Active(ref mut conn) => conn,
_ => return
};
if self.commands.send(GateCommand::Suspension {
slot: conn.slot,
suspend
}).await.is_err() {
self.unit_status.health = UnitHealth::Gone
}
else {
self.suspended = suspend
}
}
}
}
impl From<Marked<String>> for Link {
fn from(name: Marked<String>) -> Self {
manager::load_link(name)
}
}
impl From<String> for Link {
fn from(name: String) -> Self {
manager::load_link(name.into())
}
}
impl<'a> From<&'a str> for Link {
fn from(name: &'a str) -> Self {
Self::from(String::from(name))
}
}
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub enum GateStatus {
#[default]
Active,
Dormant,
}
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub enum UnitHealth {
#[default]
Healthy,
Stalled,
Gone,
}
impl<'a> From<&'a UnitUpdate> for UnitHealth {
fn from(update: &'a UnitUpdate) -> Self {
match update {
UnitUpdate::Payload(_) => Self::Healthy,
UnitUpdate::Stalled => Self::Stalled,
UnitUpdate::Gone => Self::Gone,
}
}
}
impl fmt::Display for UnitHealth {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str(match *self {
UnitHealth::Healthy => "healthy",
UnitHealth::Stalled => "stalled",
UnitHealth::Gone => "gone",
})
}
}
#[derive(Clone, Debug, Default)]
struct UnitStatus {
health: UnitHealth,
payload: Option<payload::Update>,
}
impl UnitStatus {
fn apply(&mut self, update: &UnitUpdate) -> bool {
match update {
UnitUpdate::Payload(payload) => {
if matches!(self.health, UnitHealth::Healthy)
&& Some(payload) == self.payload.as_ref()
{
false
}
else {
self.health = UnitHealth::Healthy;
self.payload = Some(payload.clone());
true
}
}
UnitUpdate::Stalled => {
if matches!(self.health, UnitHealth::Stalled) {
false
}
else {
self.health = UnitHealth::Stalled;
true
}
}
UnitUpdate::Gone => {
if matches!(self.health, UnitHealth::Gone) {
false
}
else {
self.health = UnitHealth::Gone;
true
}
}
}
}
fn to_update(&self) -> Option<UnitUpdate> {
match self.health {
UnitHealth::Healthy => {
self.payload.as_ref().map(|payload| {
UnitUpdate::Payload(payload.clone())
})
}
UnitHealth::Stalled => Some(UnitUpdate::Stalled),
UnitHealth::Gone => Some(UnitUpdate::Gone),
}
}
}
#[derive(Clone, Debug)]
pub enum UnitUpdate {
Payload(payload::Update),
Stalled,
Gone
}
#[derive(Clone, Copy, Debug)]
pub struct Terminated;
#[derive(Debug)]
enum GateCommand {
Suspension {
slot: usize,
suspend: bool,
},
Subscribe {
suspended: bool,
response: oneshot::Sender<SubscribeResponse>,
}
}
#[derive(Debug)]
struct UpdateSender {
sender: Option<mpsc::Sender<UnitUpdate>>,
suspended: bool
}
type UpdateReceiver = mpsc::Receiver<UnitUpdate>;
#[derive(Debug)]
struct SubscribeResponse {
slot: usize,
receiver: UpdateReceiver,
unit_status: UnitStatus,
}