use crate::common::frim::FrimMap;
use crate::manager::UpstreamLinkReport;
use crate::metrics::{Metric, MetricType, MetricUnit};
use crate::tracing::Tracer;
use crate::{config::Marked, payload::Update, units::Unit};
use crate::{manager, metrics};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use crossbeam_utils::atomic::AtomicCell;
use futures::future::{select, Either, Future};
use futures::pin_mut;
use inetnum::addr::Prefix;
use log::{error, log_enabled, trace, Level};
use rotonda_store::match_options::MatchOptions;
use serde::Deserialize;
use tokio::sync::mpsc::Sender;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::{Arc, Mutex, Weak};
use std::time::Duration;
use std::{
any::Any,
fmt::{self, Debug, Display},
};
use std::{future::pending, sync::atomic::AtomicUsize};
use tokio::sync::{mpsc, oneshot, RwLock};
use tokio::time::{timeout_at, Instant};
use uuid::Uuid;
#[async_trait]
pub trait DirectUpdate {
async fn direct_update(&self, update: Update);
}
pub trait AnyDirectUpdate: Any + Debug + Send + Sync + DirectUpdate {}
pub const DEF_UPDATE_QUEUE_LEN: usize = 8;
const COMMAND_QUEUE_LEN: usize = 16;
#[derive(Debug)]
pub struct NormalGateState {
command_sender: mpsc::Sender<GateCommand>,
clone_senders: Arc<FrimMap<Uuid, mpsc::Sender<GateCommand>>>,
}
#[derive(Debug)]
pub struct CloneGateState {
clone_id: Uuid,
parent_command_sender: mpsc::Sender<GateCommand>,
}
#[derive(Debug)]
pub enum GateState {
Normal(NormalGateState),
Clone(CloneGateState),
}
#[derive(Debug)]
pub struct Gate {
id: Arc<Mutex<Uuid>>,
name: Arc<String>,
commands: Arc<RwLock<mpsc::Receiver<GateCommand>>>,
updates: Arc<FrimMap<Uuid, UpdateSender>>,
queue_size: usize,
suspended: Arc<FrimMap<Uuid, UpdateSender>>,
metrics: Arc<GateMetrics>,
state: GateState,
tracer: Option<Arc<Tracer>>,
}
impl Drop for Gate {
fn drop(&mut self) {
if log_enabled!(Level::Trace) {
let clone_txt = if self.is_clone() {
format!("{} clone of ", self.clone_id())
} else {
String::new()
};
trace!("Gate[{} ({}{})]: Drop", self.name, clone_txt, self.id());
}
if self.is_clone() {
if let Ok(handle) = tokio::runtime::Handle::try_current() {
tokio::task::block_in_place(move || {
handle.block_on(self.detach());
});
} else {
self.blocking_detach();
}
}
}
}
impl Default for Gate {
fn default() -> Self {
Self::new(DEF_UPDATE_QUEUE_LEN).0
}
}
impl Gate {
pub fn new(queue_size: usize) -> (Gate, GateAgent) {
let (tx, rx) = mpsc::channel(COMMAND_QUEUE_LEN);
let gate = Gate {
id: Arc::new(Mutex::new(Uuid::new_v4())),
name: Arc::default(),
commands: Arc::new(RwLock::new(rx)),
updates: Default::default(),
queue_size,
suspended: Default::default(),
metrics: Default::default(),
state: GateState::Normal(NormalGateState {
command_sender: tx.clone(),
clone_senders: Default::default(),
}),
tracer: None,
};
let agent = GateAgent {
id: gate.id.clone(),
commands: tx,
};
if log_enabled!(Level::Trace) {
trace!("Gate[{} ({})]: New gate created", gate.name, gate.id());
}
(gate, agent)
}
fn take(
self,
) -> (mpsc::Receiver<GateCommand>, FrimMap<Uuid, UpdateSender>) {
let commands = self.commands.clone();
let updates = self.updates.clone();
drop(self);
let commands = Arc::try_unwrap(commands).unwrap().into_inner();
let updates = Arc::try_unwrap(updates).unwrap();
(commands, updates)
}
pub fn id(&self) -> Uuid {
*self.id.lock().unwrap()
}
pub fn is_clone(&self) -> bool {
match self.state {
GateState::Normal(_) => false,
GateState::Clone(_) => true,
}
}
fn clone_id(&self) -> Uuid {
let GateState::Clone(CloneGateState { clone_id, .. }) = self.state
else {
unreachable!()
};
clone_id
}
pub fn metrics(&self) -> Arc<GateMetrics> {
self.metrics.clone()
}
pub async fn detach(&self) {
if let GateState::Clone(CloneGateState {
clone_id,
parent_command_sender,
..
}) = &self.state
{
if log_enabled!(Level::Trace) {
let clone_txt = format!("{clone_id} clone of ");
trace!(
"Gate[{} ({}{})]: Detach",
self.name,
clone_txt,
self.id()
);
}
if let Err(_err) = parent_command_sender
.send(GateCommand::DetachClone {
clone_id: *clone_id,
})
.await
{
}
} else {
error!(
"Gate[{} ({})]: Root gates cannot be detached!",
self.name,
self.id()
);
}
}
pub fn blocking_detach(&self) {
if let GateState::Clone(CloneGateState {
clone_id,
parent_command_sender,
..
}) = &self.state
{
if log_enabled!(Level::Trace) {
let clone_txt = format!("{clone_id} clone of ");
trace!(
"Gate[{} ({}{})]: Blocking detach",
self.name,
clone_txt,
self.id()
);
}
if let Err(_err) = parent_command_sender.blocking_send(
GateCommand::DetachClone {
clone_id: *clone_id,
},
) {
}
} else {
error!(
"Gate[{} ({})]: Root gates cannot be blocking detached!",
self.name,
self.id()
);
}
}
pub fn set_tracer(&mut self, tracer: Arc<Tracer>) {
self.tracer = Some(tracer);
}
pub async fn process(&self) -> Result<GateStatus, Terminated> {
let status = self.get_gate_status();
loop {
let command = {
let mut lock = self.commands.write().await;
match lock.recv().await {
Some(command) => command,
None => {
if log_enabled!(Level::Trace) {
let clone_txt = if self.is_clone() {
format!("{} clone of ", self.clone_id())
} else {
String::new()
};
trace!(
"Gate[{} ({}{})]: Command channel has been closed",
self.name,
clone_txt,
self.id()
);
}
return Err(Terminated);
}
}
};
if log_enabled!(Level::Trace) {
let clone_txt = if self.is_clone() {
format!("{} clone of ", self.clone_id())
} else {
String::new()
};
trace!(
"Gate[{} ({}{})]: Received command '{}'",
self.name,
clone_txt,
self.id(),
command
);
}
match command {
GateCommand::AttachClone { clone_id, tx } => {
match &self.state {
GateState::Normal(state) => {
state.clone_senders.insert(clone_id, tx);
}
GateState::Clone(_) => unreachable!(),
}
}
GateCommand::DetachClone { clone_id } => match &self.state {
GateState::Normal(state) => {
let _ = state.clone_senders.remove(&clone_id);
}
GateState::Clone(_) => self.detach().await,
},
GateCommand::Suspension { slot, suspend } => {
self.suspension(slot, suspend)
}
GateCommand::Subscribe {
suspended,
response,
direct_update,
} => {
assert!(
!self.is_clone(),
"Cloned gates do not support the Subscribe command"
);
self.subscribe(suspended, response, direct_update).await
}
GateCommand::Unsubscribe { slot } => {
assert!(
!self.is_clone(),
"Cloned gates do not support the Unsubscribe command"
);
self.unsubscribe(slot).await
}
GateCommand::FollowSubscribe {
slot,
update_sender,
} => {
assert!(
self.is_clone(),
"Only cloned gates support the FollowSubscribe command"
);
self.updates.insert(slot, update_sender);
}
GateCommand::FollowUnsubscribe { slot } => {
assert!(
self.is_clone(),
"Only cloned gates support the FollowUnsubscribe command"
);
self.updates.remove(&slot);
}
GateCommand::Reconfigure {
new_config,
new_gate,
} => {
assert!(
!self.is_clone(),
"Cloned gates do not support the Reconfigure command"
);
{
let new_id = *new_gate.id.lock().unwrap();
let mut id = self.id.lock().unwrap();
if log_enabled!(log::Level::Trace) {
trace!(
"Gate[{} ({})]: Reconfiguring: new ID={}",
self.name,
id,
new_id
);
}
*id = new_id;
}
let (new_commands, new_updates) = new_gate.take();
*self.commands.write().await = new_commands;
self.updates.replace(new_updates);
self.notify_clones(GateCommand::FollowReconfigure {
new_config: new_config.clone(),
})
.await;
return Ok(GateStatus::Reconfiguring { new_config });
}
GateCommand::FollowReconfigure { new_config } => {
assert!(
self.is_clone(),
"Only cloned gates support the FollowReconfigure command"
);
return Ok(GateStatus::Reconfiguring { new_config });
}
GateCommand::ReportLinks { report } => {
self.notify_clones(GateCommand::ReportLinks {
report: report.clone(),
})
.await;
return Ok(GateStatus::ReportLinks { report });
}
GateCommand::Trigger { data } => {
self.notify_clones(GateCommand::Trigger {
data: data.clone(),
})
.await;
return Ok(GateStatus::Triggered { data });
}
GateCommand::Terminate => {
self.notify_clones(GateCommand::Terminate).await;
return Err(Terminated);
}
}
let new_status = self.get_gate_status();
if new_status != status {
return Ok(new_status);
}
}
}
async fn notify_clones(&self, cmd: GateCommand) {
if let GateState::Normal(NormalGateState { clone_senders, .. }) =
&self.state
{
let mut closed_sender_found = false;
for (uuid, sender) in clone_senders.guard().iter() {
if !sender.is_closed() {
if log_enabled!(Level::Trace) {
let clone_txt = if self.is_clone() {
format!("{} clone of ", self.clone_id())
} else {
String::new()
};
trace!(
"Gate[{} ({}{})]: Notifying clone {} of command '{}'",
self.name,
clone_txt,
self.id(),
uuid,
cmd
);
}
sender.send(cmd.clone()).await.expect(
"Internal error: failed to notify cloned gate",
);
} else {
if log_enabled!(Level::Trace) {
let clone_txt = if self.is_clone() {
format!("{} clone of ", self.clone_id())
} else {
String::new()
};
trace!("Gate[{} ({}{})]: Unable to notify clone {} of command '{}': sender is closed", self.name, clone_txt, self.id(), uuid, cmd);
}
closed_sender_found = true;
}
}
if closed_sender_found {
clone_senders.retain(|_uuid, sender| !sender.is_closed());
}
}
}
pub async fn process_until<Fut: Future>(
&self,
fut: Fut,
) -> Result<Fut::Output, Terminated> {
pin_mut!(fut);
loop {
let process = self.process();
pin_mut!(process);
match select(process, fut).await {
Either::Left((Err(_), _)) => return Err(Terminated),
Either::Left((Ok(_), next_fut)) => {
fut = next_fut;
}
Either::Right((res, _)) => return Ok(res),
}
}
}
pub async fn wait(&self, secs: u64) -> Result<(), Terminated> {
let end = Instant::now() + Duration::from_secs(secs);
while end > Instant::now() {
match timeout_at(end, self.process()).await {
Ok(Ok(_status)) => {
}
Ok(Err(Terminated)) => {
return Err(Terminated);
}
Err(_) => {
return Ok(());
}
}
}
Ok(())
}
pub async fn update_data(&self, update: Update) {
let mut sent_at_least_once = false;
if log_enabled!(Level::Trace) {
let clone_txt = if self.is_clone() {
format!("{} clone of ", self.clone_id())
} else {
String::new()
};
trace!(
"Gate[{} ({}{})]: Starting update",
self.name,
clone_txt,
self.id()
);
}
for (uuid, item) in self.updates.guard().iter() {
match (&item.queue, &item.direct) {
(Some(sender), None) => {
if let Some(tracer) = &self.tracer {
for payload in update.trace_ids() {
tracer.note_gate_event(
payload.trace_id().unwrap(),
self.id(),
format!("Sent by queue from gate {} to slot {uuid}: {payload:#?}", self.id()),
);
}
}
if sender.send(Ok(update.clone())).await.is_ok() {
sent_at_least_once = true;
continue;
}
}
(None, Some(direct)) => {
if log_enabled!(log::Level::Trace) {
let clone_txt = if self.is_clone() {
format!("{} clone of ", self.clone_id())
} else {
String::new()
};
trace!(
"Gate[{} ({}{})]: Sending direct update for slot {}",
self.name,
clone_txt,
self.id(),
uuid
);
}
if let Some(tracer) = &self.tracer {
for payload in update.trace_ids() {
tracer.note_gate_event(
payload.trace_id().unwrap(),
self.id(),
format!(
"Sent by direct update from gate {} to slot {uuid}: {payload:#?}",
self.id()
),
);
}
}
if let Some(direct) = direct.upgrade() {
direct.direct_update(update.clone()).await;
sent_at_least_once = true;
}
continue;
}
_ => {}
}
}
if log_enabled!(Level::Trace) {
let clone_txt = if self.is_clone() {
format!("{} clone of ", self.clone_id())
} else {
String::new()
};
trace!(
"Gate[{} ({}{})]: Finished update",
self.name,
clone_txt,
self.id()
);
}
self.metrics.update(
&update,
self.updates.clone(),
sent_at_least_once,
);
}
pub fn get_gate_status(&self) -> GateStatus {
if self.suspended.len() == self.updates.len() {
GateStatus::Dormant
} else {
GateStatus::Active
}
}
fn suspension(&self, slot: Uuid, suspend: bool) {
if suspend {
if let Some(removed) = self.updates.remove(&slot) {
self.suspended.insert(slot, removed);
}
} else if let Some(removed) = self.suspended.remove(&slot) {
self.updates.insert(slot, removed);
}
}
async fn subscribe(
&self,
suspended: bool,
response: oneshot::Sender<SubscribeResponse>,
direct_update: Option<Weak<dyn AnyDirectUpdate>>,
) {
let (update_sender, receiver) =
if let Some(direct_update) = direct_update {
let update_sender = UpdateSender {
queue: None,
direct: Some(direct_update),
};
(update_sender, None)
} else {
let (tx, receiver) = mpsc::channel(self.queue_size);
let update_sender = UpdateSender {
queue: Some(tx),
direct: None,
};
(update_sender, Some(receiver))
};
let slot = Uuid::new_v4();
if suspended {
self.suspended.insert(slot, update_sender.clone());
} else {
self.updates.insert(slot, update_sender.clone());
}
let subscription = SubscribeResponse { slot, receiver };
if let Err(subscription) = response.send(subscription) {
if suspended {
self.suspended.remove(&subscription.slot);
} else {
self.updates.remove(&subscription.slot);
}
} else {
self.notify_clones(GateCommand::FollowSubscribe {
slot,
update_sender,
})
.await;
}
}
async fn unsubscribe(&self, slot: Uuid) {
self.suspended.remove(&slot);
self.updates.remove(&slot);
self.notify_clones(GateCommand::FollowUnsubscribe { slot })
.await;
}
pub(crate) fn set_name(&mut self, name: &str) {
self.name = Arc::new(name.to_string());
}
pub fn name(&self) -> Arc<String> {
self.name.clone()
}
}
impl Clone for Gate {
fn clone(&self) -> Self {
let (tx, rx) = mpsc::channel(COMMAND_QUEUE_LEN);
let clone_id = Uuid::new_v4();
if log_enabled!(Level::Trace) {
let clone_txt = if self.is_clone() {
format!("{} clone of ", self.clone_id())
} else {
String::new()
};
trace!(
"Gate[{} ({}{})]: Cloning gate to new clone id {}",
self.name,
clone_txt,
self.id(),
clone_id
);
}
let parent_command_sender = match &self.state {
GateState::Normal(state) => state.command_sender.clone(),
GateState::Clone(state) => state.parent_command_sender.clone(),
};
let gate = Gate {
id: self.id.clone(),
name: self.name.clone(),
commands: Arc::new(RwLock::new(rx)),
updates: self.updates.clone(),
queue_size: self.queue_size,
suspended: self.suspended.clone(),
metrics: self.metrics.clone(),
state: GateState::Clone(CloneGateState {
clone_id,
parent_command_sender: parent_command_sender.clone(),
}),
tracer: self.tracer.clone(),
};
let cloned_name = self.name.clone();
let copied_id = self.id();
crate::tokio::spawn("gate-attach-clone", async move {
let saved_clone_id = clone_id;
if let Err(_err) = parent_command_sender
.send(GateCommand::AttachClone { clone_id, tx })
.await
{
let clone_txt = format!("{} clone of ", saved_clone_id);
error!(
"Gate[{} ({}{})]: Failed to attach clone to parent {}",
cloned_name, clone_txt, copied_id, clone_id
);
}
});
gate
}
}
#[derive(Clone, Debug)]
pub struct GateAgent {
id: Arc<Mutex<Uuid>>,
commands: mpsc::Sender<GateCommand>,
}
impl GateAgent {
pub fn id(&self) -> Uuid {
*self.id.lock().unwrap()
}
pub fn create_link(&mut self) -> Link {
Link::new(self.id(), self.commands.clone())
}
pub async fn terminate(&self) {
let _ = self.commands.send(GateCommand::Terminate).await;
}
pub fn is_terminated(&self) -> bool {
self.commands.is_closed()
}
pub async fn reconfigure(
&self,
new_config: Unit,
new_gate: Gate,
) -> Result<(), String> {
self.commands
.send(GateCommand::Reconfigure {
new_config,
new_gate,
})
.await
.map_err(|err| format!("{}", err))
}
pub async fn report_links(
&self,
report: UpstreamLinkReport,
) -> Result<(), String> {
self.commands
.send(GateCommand::ReportLinks { report })
.await
.map_err(|err| format!("{}", err))
}
}
pub trait GraphStatus: Send + Sync {
fn status_text(&self) -> String;
fn okay(&self) -> Option<bool> {
None
}
}
#[derive(Debug, Default)]
pub struct GateMetrics {
pub update_set_size: AtomicUsize,
pub update: AtomicCell<Option<DateTime<Utc>>>,
pub num_updates: AtomicUsize,
pub num_dropped_updates: AtomicUsize,
}
impl GraphStatus for GateMetrics {
fn status_text(&self) -> String {
format!("out: {}", self.num_updates.load(SeqCst))
}
}
impl GateMetrics {
fn update(
&self,
update: &Update,
_senders: Arc<FrimMap<Uuid, UpdateSender>>,
sent_at_least_once: bool,
) {
self.num_updates.fetch_add(1, SeqCst);
if !sent_at_least_once {
self.num_dropped_updates.fetch_add(1, SeqCst);
}
if let Update::Bulk(update) = update {
self.update_set_size.store(update.len(), SeqCst);
}
self.update.store(Some(Utc::now()));
}
}
impl GateMetrics {
const NUM_UPDATES_METRIC: Metric = Metric::new(
"num_updates",
"the number of updates sent through the gate",
MetricType::Counter,
MetricUnit::Total,
);
const NUM_DROPPED_UPDATES_METRIC: Metric = Metric::new(
"num_dropped_updates",
"the number of updates that could not be sent through the gate",
MetricType::Counter,
MetricUnit::Total,
);
const UPDATE_SET_SIZE_METRIC: Metric = Metric::new(
"update_set_size",
"the number of set items in the last update",
MetricType::Gauge,
MetricUnit::Total,
);
const UPDATE_WHEN_METRIC: Metric = Metric::new(
"last_update",
"the date and time of the last update",
MetricType::Text,
MetricUnit::Info,
);
const UPDATE_AGO_METRIC: Metric = Metric::new(
"since_last_update",
"the number of seconds since the last update",
MetricType::Gauge,
MetricUnit::Second,
);
}
impl metrics::Source for GateMetrics {
fn append(&self, unit_name: &str, target: &mut metrics::Target) {
target.append_simple(
&Self::NUM_UPDATES_METRIC,
Some(unit_name),
self.num_updates.load(SeqCst),
);
target.append_simple(
&Self::NUM_DROPPED_UPDATES_METRIC,
Some(unit_name),
self.num_dropped_updates.load(SeqCst),
);
match self.update.load() {
Some(update) => {
target.append_simple(
&Self::UPDATE_WHEN_METRIC,
Some(unit_name),
update,
);
let ago =
Utc::now().signed_duration_since(update).num_seconds();
target.append_simple(
&Self::UPDATE_AGO_METRIC,
Some(unit_name),
ago,
);
target.append_simple(
&Self::UPDATE_SET_SIZE_METRIC,
Some(unit_name),
self.update_set_size.load(SeqCst),
);
}
None => {
target.append_simple(
&Self::UPDATE_WHEN_METRIC,
Some(unit_name),
"N/A",
);
target.append_simple(
&Self::UPDATE_AGO_METRIC,
Some(unit_name),
-1,
);
}
}
}
}
#[derive(Clone, Debug, Deserialize)]
#[serde(from = "String")]
pub struct DirectLink(Link);
impl DirectLink {
pub fn id(&self) -> Uuid {
self.0.id()
}
pub fn gate_id(&self) -> Uuid {
self.0.gate_id()
}
pub fn connected_gate_slot(&self) -> Option<Uuid> {
self.0.connection.as_ref().map(|connection| connection.slot)
}
pub async fn suspend(&mut self) {
self.0.suspend().await
}
pub fn get_status(&self) -> UnitStatus {
self.0.get_status()
}
pub async fn connect(
&mut self,
direct_update_target: Arc<dyn AnyDirectUpdate>,
suspended: bool,
) -> Result<(), UnitStatus> {
self.0.set_direct_update_target(direct_update_target);
self.0.connect(suspended).await
}
pub async fn disconnect(&mut self) {
self.0.direct_update_target = None;
self.0.disconnect().await;
}
}
impl From<Link> for DirectLink {
fn from(link: Link) -> Self {
DirectLink(link)
}
}
impl From<Marked<String>> for DirectLink {
fn from(name: Marked<String>) -> Self {
DirectLink(manager::load_link(name))
}
}
impl From<String> for DirectLink {
fn from(name: String) -> Self {
DirectLink(manager::load_link(name.into()))
}
}
#[derive(Debug)]
struct LinkConnection {
slot: Uuid,
updates: Option<UpdateReceiver>,
}
#[derive(Deserialize)]
#[serde(from = "String")]
pub struct Link {
id: Uuid,
gate_id: Uuid,
commands: mpsc::Sender<GateCommand>,
connection: Option<LinkConnection>,
unit_status: UnitStatus,
suspended: bool,
direct_update_target: Option<Weak<dyn AnyDirectUpdate>>,
}
impl PartialEq for Link {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
impl Eq for Link {}
impl std::fmt::Debug for Link {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Link")
.field("id", &self.id)
.field("gate_id", &self.gate_id)
.field("connection", &self.connected_gate_slot())
.field("unit_status", &self.unit_status)
.field("suspended", &self.suspended)
.field(
"direct_update_target",
&self.direct_update_target.is_some(),
)
.finish()
}
}
impl Clone for Link {
fn clone(&self) -> Self {
Self {
id: self.id,
gate_id: self.gate_id,
commands: self.commands.clone(),
connection: None,
unit_status: self.unit_status,
suspended: self.suspended,
direct_update_target: self.direct_update_target.clone(),
}
}
}
impl Link {
fn new(gate_id: Uuid, commands: mpsc::Sender<GateCommand>) -> Self {
Link {
id: Uuid::new_v4(),
gate_id,
commands,
connection: None,
unit_status: UnitStatus::Healthy,
suspended: false,
direct_update_target: None,
}
}
pub fn id(&self) -> Uuid {
self.id
}
pub fn gate_id(&self) -> Uuid {
self.gate_id
}
pub fn connected_gate_slot(&self) -> Option<Uuid> {
self.connection.as_ref().map(|connection| connection.slot)
}
pub fn close(&mut self) {
if let Some(conn) = self.connection.as_mut() {
if let Some(updates) = &mut conn.updates {
updates.close();
}
}
}
pub async fn query(&mut self) -> Result<Update, UnitStatus> {
self.connect(false).await?;
let conn = self.connection.as_mut().unwrap();
if let Some(updates) = &mut conn.updates {
match updates.recv().await {
Some(Ok(update)) => Ok(update),
Some(Err(status)) => {
self.unit_status = status;
Err(status)
}
None => {
self.unit_status = UnitStatus::Gone;
Err(UnitStatus::Gone)
}
}
} else {
pending().await
}
}
pub async fn query_suspended(&mut self) -> UnitStatus {
if let Err(err) = self.connect(true).await {
return err;
}
let conn = self.connection.as_mut().unwrap();
if let Some(updates) = &mut conn.updates {
loop {
match updates.recv().await {
Some(Ok(_)) => continue,
Some(Err(status)) => return status,
None => {
self.unit_status = UnitStatus::Gone;
return UnitStatus::Gone;
}
}
}
} else {
pending().await
}
}
pub async fn suspend(&mut self) {
if !self.suspended {
self.request_suspend(true).await
}
}
async fn request_suspend(&mut self, suspend: bool) {
if self.connection.is_none() {
return;
}
let conn = self.connection.as_mut().unwrap();
if self
.commands
.send(GateCommand::Suspension {
slot: conn.slot,
suspend,
})
.await
.is_err()
{
self.unit_status = UnitStatus::Gone
} else {
self.suspended = suspend
}
}
pub fn get_status(&self) -> UnitStatus {
self.unit_status
}
pub async fn connect(
&mut self,
suspended: bool,
) -> Result<(), UnitStatus> {
if self.connection.is_some() {
return Ok(());
}
if let UnitStatus::Gone = self.unit_status {
return Err(UnitStatus::Gone);
}
let (tx, rx) = oneshot::channel();
if self
.commands
.send(GateCommand::Subscribe {
suspended,
response: tx,
direct_update: self.direct_update_target.clone(),
})
.await
.is_err()
{
self.unit_status = UnitStatus::Gone;
return Err(UnitStatus::Gone);
}
let sub = match rx.await {
Ok(sub) => sub,
Err(_) => {
self.unit_status = UnitStatus::Gone;
return Err(UnitStatus::Gone);
}
};
self.connection = Some(LinkConnection {
slot: sub.slot,
updates: sub.receiver,
});
if log_enabled!(log::Level::Trace) {
trace!(
"Link[{}]: connected to gate slot {}",
self.id(),
sub.slot
);
}
self.unit_status = UnitStatus::Healthy;
self.suspended = suspended;
Ok(())
}
pub async fn disconnect(&mut self) {
if let Some(connection) = &self.connection {
let _ = self
.commands
.send(GateCommand::Unsubscribe {
slot: connection.slot,
})
.await;
if log_enabled!(log::Level::Trace) {
trace!(
"Link[{}]: disconnected from gate slot {}",
self.id(),
connection.slot
);
}
}
self.connection = None;
}
pub async fn trigger(&self, data: TriggerData) {
let _ = self.commands.send(GateCommand::Trigger { data }).await;
if log_enabled!(log::Level::Trace) {
trace!("Link[{}]: sent trigger to gate slot", self.id(),);
}
}
pub fn set_direct_update_target(
&mut self,
direct_update_target: Arc<dyn AnyDirectUpdate>,
) {
self.direct_update_target =
Some(Arc::downgrade(&direct_update_target));
}
}
impl Drop for Link {
fn drop(&mut self) {
if let Some(connection) = &self.connection {
let id = self.id();
let slot = connection.slot;
let tx = self.commands.clone();
crate::tokio::spawn("drop-link", async move {
let _ = tx.send(GateCommand::Unsubscribe { slot }).await;
if log_enabled!(log::Level::Trace) {
trace!(
"Link[{}]: disconnected from gate slot {} on drop",
id,
slot
);
}
});
}
}
}
impl From<Marked<String>> for Link {
fn from(name: Marked<String>) -> Self {
manager::load_link(name)
}
}
impl From<String> for Link {
fn from(name: String) -> Self {
manager::load_link(name.into())
}
}
#[derive(Debug, Default)]
pub enum GateStatus {
#[default]
Active,
Dormant,
Reconfiguring { new_config: Unit },
ReportLinks { report: UpstreamLinkReport },
Triggered { data: TriggerData },
}
impl Eq for GateStatus {}
impl PartialEq for GateStatus {
fn eq(&self, other: &Self) -> bool {
core::mem::discriminant(self) == core::mem::discriminant(other)
}
}
impl Display for GateStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
GateStatus::Active => f.write_str("Active"),
GateStatus::Dormant => f.write_str("Dormant"),
GateStatus::Reconfiguring { .. } => f.write_str("Reconfiguring"),
GateStatus::ReportLinks { .. } => f.write_str("ReportLinks"),
GateStatus::Triggered { .. } => f.write_str("Triggered"),
}
}
}
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub enum UnitStatus {
#[default]
Healthy,
Stalled,
Gone,
}
impl fmt::Display for UnitStatus {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str(match *self {
UnitStatus::Healthy => "healthy",
UnitStatus::Stalled => "stalled",
UnitStatus::Gone => "gone",
})
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct Terminated;
#[derive(Clone, Debug)]
pub enum TriggerData {
MatchPrefix(Uuid, Prefix, MatchOptions),
}
#[derive(Debug)]
enum GateCommand {
ReportLinks {
report: UpstreamLinkReport,
},
Suspension {
slot: Uuid,
suspend: bool,
},
Subscribe {
suspended: bool,
response: oneshot::Sender<SubscribeResponse>,
direct_update: Option<Weak<dyn AnyDirectUpdate>>,
},
Unsubscribe {
slot: Uuid,
},
FollowSubscribe {
slot: Uuid,
update_sender: UpdateSender,
},
FollowUnsubscribe {
slot: Uuid,
},
Reconfigure {
new_config: Unit,
new_gate: Gate,
},
FollowReconfigure {
new_config: Unit,
},
Trigger {
data: TriggerData,
},
Terminate,
AttachClone {
clone_id: Uuid,
tx: Sender<GateCommand>,
},
DetachClone {
clone_id: Uuid,
},
}
impl Clone for GateCommand {
fn clone(&self) -> Self {
match self {
Self::FollowSubscribe {
slot,
update_sender,
} => Self::FollowSubscribe {
slot: *slot,
update_sender: update_sender.clone(),
},
Self::FollowUnsubscribe { slot } => {
Self::FollowUnsubscribe { slot: *slot }
}
Self::FollowReconfigure { new_config } => {
Self::FollowReconfigure {
new_config: new_config.clone(),
}
}
Self::Terminate => Self::Terminate,
Self::ReportLinks { report } => Self::ReportLinks {
report: report.clone(),
},
_ => panic!("Internal error: Unclonable GateCommand"),
}
}
}
impl Display for GateCommand {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
GateCommand::ReportLinks { .. } => f.write_str("ReportLinks"),
GateCommand::Suspension { .. } => f.write_str("Suspension"),
GateCommand::Subscribe { .. } => f.write_str("Subscribe"),
GateCommand::Unsubscribe { .. } => f.write_str("Unsubscribe"),
GateCommand::FollowSubscribe { .. } => {
f.write_str("FollowSubscribe")
}
GateCommand::FollowUnsubscribe { .. } => {
f.write_str("FollowUnsubscribe")
}
GateCommand::Reconfigure { .. } => f.write_str("Reconfigure"),
GateCommand::FollowReconfigure { .. } => {
f.write_str("FollowReconfigure")
}
GateCommand::Trigger { .. } => f.write_str("Trigger"),
GateCommand::Terminate => f.write_str("Terminate"),
GateCommand::AttachClone { .. } => f.write_str("AttachClone"),
GateCommand::DetachClone { .. } => f.write_str("DetachClone"),
}
}
}
#[derive(Clone, Debug)]
struct UpdateSender {
queue: Option<mpsc::Sender<Result<Update, UnitStatus>>>,
direct: Option<Weak<dyn AnyDirectUpdate>>,
}
type UpdateReceiver = mpsc::Receiver<Result<Update, UnitStatus>>;
#[derive(Debug)]
struct SubscribeResponse {
slot: Uuid,
receiver: Option<UpdateReceiver>,
}
#[cfg(test)]
mod tests {
use chrono::SubsecRound;
use smallvec::smallvec;
use tokio::sync::Notify;
use crate::{
payload::Payload,
tests::util::internal::{
enable_logging, get_testable_metrics_snapshot,
},
};
use super::*;
#[tokio::test(flavor = "multi_thread")]
#[cfg(not(tarpaulin))]
async fn gate_link_lifecycle_test() {
use std::str::FromStr;
use routecore::bgp::{message::PduParseInfo, nlri::afisafi::Ipv4UnicastNlri, path_attributes::OwnedPathAttributes};
use crate::{payload::{RotondaPaMap, RotondaRoute}, roto_runtime::types::RouteContext};
fn mk_test_payload() -> Payload {
Payload::new(
RotondaRoute::Ipv4Unicast(
Ipv4UnicastNlri::from_str("1.2.3.0/24").unwrap(),
RotondaPaMap::new(
OwnedPathAttributes::new(
PduParseInfo::modern(),
vec![]
)
)
),
RouteContext::for_reprocessing(),
None
)
}
eprintln!("STARTING");
let (gate, mut agent) = Gate::new(1);
let mut link = agent.create_link();
#[derive(Debug)]
struct TestDirectUpdateTarget(Arc<Notify>);
#[async_trait]
impl DirectUpdate for TestDirectUpdateTarget {
async fn direct_update(&self, update: Update) {
assert!(matches!(update, Update::Bulk(_)));
if let Update::Bulk(payload) = update {
assert_eq!(payload.len(), 2);
assert_eq!(payload[0], mk_test_payload());
assert_eq!(payload[1], mk_test_payload());
self.0.notify_one();
}
}
}
impl AnyDirectUpdate for TestDirectUpdateTarget {}
let notify = Arc::new(Notify::default());
let test_target = Arc::new(TestDirectUpdateTarget(notify.clone()));
eprintln!("SETTING LINK TO DIRECT UPDATE MODE");
link.set_direct_update_target(test_target.clone());
let gate = Arc::new(gate);
let gate_clone = gate.clone();
tokio::spawn(async move {
loop {
gate.process().await.unwrap();
}
});
let metrics = get_testable_metrics_snapshot(&gate_clone.metrics());
assert_eq!(metrics.with_name::<usize>("num_updates"), 0);
assert_eq!(metrics.with_name::<String>("last_update"), "N/A");
assert_eq!(metrics.with_name::<String>("since_last_update"), "-1");
eprintln!(
"TESTING THAT UPDATES ARE DROPPED WHEN THERE IS NO DOWNSTREAM"
);
let update = Update::Single(mk_test_payload());
gate_clone.update_data(update).await;
let metrics = get_testable_metrics_snapshot(&gate_clone.metrics());
assert_eq!(metrics.with_name::<usize>("num_updates"), 1);
assert_eq!(metrics.with_name::<usize>("num_dropped_updates"), 1);
assert_eq!(
metrics
.with_name::<DateTime<Utc>>("last_update")
.round_subsecs(0),
Utc::now().round_subsecs(0)
);
assert!(metrics.with_name::<i64>("since_last_update") >= 0);
eprintln!("CONNECTING LINK TO GATE");
link.connect(false).await.unwrap();
let update =
Update::Bulk(smallvec![mk_test_payload(), mk_test_payload()]);
eprintln!("SENDING PAYLOAD");
gate_clone.update_data(update).await;
eprintln!("WAITING FOR PAYLOAD TO BE RECEIVED BY THE TEST TARGET");
let timeout = Box::pin(tokio::time::sleep(Duration::from_secs(3)));
let notified = Box::pin(notify.notified());
assert!(matches!(select(timeout, notified).await, Either::Right(..)));
let metrics = get_testable_metrics_snapshot(&gate_clone.metrics());
assert_eq!(metrics.with_name::<usize>("num_updates"), 2);
assert_eq!(metrics.with_name::<usize>("num_dropped_updates"), 1);
assert_eq!(
metrics
.with_name::<DateTime<Utc>>("last_update")
.round_subsecs(0),
Utc::now().round_subsecs(0)
);
let since_last_update = metrics.with_name::<i64>("since_last_update");
assert_eq!(metrics.with_name::<usize>("update_set_size"), 2);
eprintln!("WAITING TO CHECK THAT SINCE_LAST_UPDATE METRIC UPDATES");
tokio::time::sleep(Duration::from_secs(2)).await;
let metrics = get_testable_metrics_snapshot(&gate_clone.metrics());
let new_since_last_update =
metrics.with_name::<i64>("since_last_update");
assert!(new_since_last_update > since_last_update);
}
#[tokio::test(flavor = "multi_thread")]
async fn gate_clones_terminate_when_parent_gate_is_dropped() {
let (gate, agent) = Gate::new(10);
let gate_clone = gate.clone();
eprintln!("CHECKING GATE DOES NOT YET HAVE CLONE SENDER");
assert!(matches!(&gate.state, GateState::Normal(NormalGateState {
clone_senders, .. }) if clone_senders.is_empty()));
let _ = gate
.process_until(tokio::time::sleep(Duration::from_secs(1)))
.await;
eprintln!("CHECKING GATE HAS CLONE SENDER");
assert!(matches!(&gate.state, GateState::Normal(NormalGateState {
clone_senders, .. }) if !clone_senders.is_empty()));
eprintln!("SENDING TERMINATION COMMAND");
agent.terminate().await;
eprintln!("CHECKING GATE IS TERMINATED");
assert_eq!(gate.process().await, Err(Terminated));
drop(gate);
eprintln!("CHECKING GATE CLONE IS TERMINATED");
assert_eq!(gate_clone.process().await, Err(Terminated));
eprintln!("GATE AND GATE CLONE ARE TERMINATED");
}
#[tokio::test(flavor = "multi_thread")]
async fn gate_clones_receive_termination_signal() {
enable_logging("trace");
let (gate, agent) = Gate::new(10);
let gate_clone = gate.clone();
eprintln!("CHECKING GATE DOES NOT YET HAVE CLONE SENDER");
assert!(matches!(&gate.state, GateState::Normal(NormalGateState {
clone_senders, .. }) if clone_senders.is_empty()));
let _ = gate
.process_until(tokio::time::sleep(Duration::from_secs(1)))
.await;
eprintln!("CHECKING GATE HAS CLONE SENDER");
assert!(matches!(&gate.state, GateState::Normal(NormalGateState {
clone_senders, .. }) if !clone_senders.is_empty()));
eprintln!("SENDING TERMINATION COMMAND");
agent.terminate().await;
eprintln!("CHECKING GATE IS TERMINATED");
assert_eq!(gate.process().await, Err(Terminated));
eprintln!("CHECKING GATE CLONE IS TERMINATED");
assert_eq!(gate_clone.process().await, Err(Terminated));
eprintln!("GATE AND GATE CLONE ARE TERMINATED");
}
#[tokio::test(flavor = "multi_thread")]
async fn gate_parent_cleans_up_when_clone_terminates() {
let (gate, _agent) = Gate::new(10);
let gate_clone = gate.clone();
eprintln!("CHECKING GATE DOES NOT YET HAVE CLONE SENDER");
assert!(matches!(&gate.state, GateState::Normal(NormalGateState {
clone_senders, .. }) if clone_senders.is_empty()));
let _ = gate
.process_until(tokio::time::sleep(Duration::from_secs(1)))
.await;
eprintln!("CHECKING GATE HAS CLONE SENDER");
assert!(matches!(&gate.state, GateState::Normal(NormalGateState {
clone_senders, .. }) if !clone_senders.is_empty()));
eprintln!("DROPPING CLONED GATE");
drop(gate_clone);
eprintln!("PROCESS COMMANDS IN PARENT GATE");
gate.wait(1).await.unwrap();
eprintln!("CHECKING GATE HAS NO CLONE SENDER");
assert!(matches!(&gate.state, GateState::Normal(NormalGateState {
clone_senders, .. }) if clone_senders.is_empty()));
}
}