use std::{
net::{IpAddr, SocketAddr},
sync::{Arc, Weak},
time::Instant,
};
use fixedbitset::FixedBitSet;
use indexmap::IndexSet;
use jwt::SignWithKey;
use rand::{Rng, SeedableRng};
use rand_chacha::ChaChaRng;
use serde::{Deserialize, Serialize};
use snops_common::{
lasso::Spur,
rpc::control::agent::AgentServiceClient,
state::{AgentId, AgentModeOptions, AgentState, AgentStatus, EnvId, NodeState, PortConfig},
INTERN,
};
use super::{AgentClient, AgentFlags, PendingAgentReconcile};
use crate::server::jwt::{Claims, JWT_SECRET};
#[derive(Debug)]
pub struct Busy;
#[derive(Debug)]
pub struct Agent {
pub(crate) id: AgentId,
pub(crate) claims: Claims,
pub(crate) connection: AgentConnection,
pub(crate) state: AgentState,
pub(crate) status: AgentStatus,
pub(crate) flags: AgentFlags,
pub(crate) compute_claim: Arc<Busy>,
pub(crate) env_claim: Arc<Busy>,
pub(crate) ports: Option<PortConfig>,
pub(crate) addrs: Option<AgentAddrs>,
}
impl Agent {
pub fn new(rpc: AgentServiceClient, id: AgentId, flags: AgentFlags) -> Self {
Self {
id,
flags,
compute_claim: Arc::new(Busy),
env_claim: Arc::new(Busy),
claims: Claims {
id,
nonce: ChaChaRng::from_entropy().gen(),
},
connection: AgentConnection::Online(rpc),
state: Default::default(),
status: Default::default(),
ports: None,
addrs: None,
}
}
pub(crate) fn from_components(
claims: Claims,
state: AgentState,
flags: AgentFlags,
ports: Option<PortConfig>,
addrs: Option<AgentAddrs>,
) -> Self {
Self {
id: claims.id,
flags,
compute_claim: Arc::new(Busy),
env_claim: Arc::new(Busy),
claims,
connection: AgentConnection::Offline {
since: Instant::now(),
},
status: Default::default(),
state,
ports,
addrs,
}
}
pub fn is_connected(&self) -> bool {
matches!(self.connection, AgentConnection::Online(_))
}
pub fn is_node_capable(&self) -> bool {
if !self.is_connected() {
return false;
};
self.addrs
.as_ref()
.map(AgentAddrs::is_some)
.unwrap_or_default()
}
pub fn has_labels(&self, labels: &IndexSet<Spur>) -> bool {
labels.is_empty() || self.flags.labels.intersection(labels).count() == labels.len()
}
pub fn has_label(&self, label: Spur) -> bool {
self.flags.labels.contains(&label)
}
pub fn has_label_str(&self, label: &str) -> bool {
INTERN
.get(label)
.map_or(false, |label| self.flags.labels.contains(&label))
}
pub fn str_labels(&self) -> IndexSet<&str> {
self.flags
.labels
.iter()
.map(|s| INTERN.resolve(s))
.collect()
}
pub fn mask(&self, labels: &[Spur]) -> FixedBitSet {
self.flags.mask(labels)
}
pub fn is_inventory(&self) -> bool {
matches!(self.state, AgentState::Inventory)
}
pub fn can_compute(&self) -> bool {
self.is_inventory() && self.flags.mode.compute && !self.is_compute_claimed()
}
pub fn is_compute_claimed(&self) -> bool {
Arc::strong_count(&self.compute_claim) > 1
}
pub fn make_busy(&self) -> Arc<Busy> {
Arc::clone(&self.compute_claim)
}
pub fn get_compute_claim(&self) -> Weak<Busy> {
Arc::downgrade(&self.compute_claim)
}
pub fn is_env_claimed(&self) -> bool {
Arc::strong_count(&self.env_claim) > 1
}
pub fn get_env_claim(&self) -> Weak<Busy> {
Arc::downgrade(&self.env_claim)
}
pub fn env(&self) -> Option<EnvId> {
match &self.state {
AgentState::Node(id, _) => Some(*id),
_ => None,
}
}
pub fn id(&self) -> AgentId {
self.id
}
pub fn state(&self) -> &AgentState {
&self.state
}
pub fn modes(&self) -> AgentModeOptions {
self.flags.mode
}
pub fn claims(&self) -> &Claims {
&self.claims
}
pub fn sign_jwt(&self) -> String {
self.claims.to_owned().sign_with_key(&*JWT_SECRET).unwrap()
}
pub fn rpc(&self) -> Option<&AgentServiceClient> {
match self.connection {
AgentConnection::Online(ref rpc) => Some(rpc),
_ => None,
}
}
pub fn client_owned(&self) -> Option<AgentClient> {
match self.connection {
AgentConnection::Online(ref rpc) => Some(AgentClient(rpc.to_owned())),
_ => None,
}
}
pub fn mark_disconnected(&mut self) {
self.connection = AgentConnection::Offline {
since: Instant::now(),
};
}
pub fn mark_connected(&mut self, client: AgentServiceClient, flags: AgentFlags) {
self.connection = AgentConnection::Online(client);
self.flags = flags;
}
pub fn set_state(&mut self, state: AgentState) {
self.state = state;
}
pub fn set_ports(&mut self, ports: PortConfig) {
self.ports = Some(ports);
}
pub fn bft_port(&self) -> u16 {
self.ports.as_ref().map(|p| p.bft).unwrap_or_default()
}
pub fn node_port(&self) -> u16 {
self.ports.as_ref().map(|p| p.node).unwrap_or_default()
}
pub fn rest_port(&self) -> u16 {
self.ports.as_ref().map(|p| p.rest).unwrap_or_default()
}
pub fn rest_addr(&self) -> Option<SocketAddr> {
Some(SocketAddr::new(self.addrs()?.usable()?, self.rest_port()))
}
pub fn metrics_port(&self) -> u16 {
self.ports.as_ref().map(|p| p.metrics).unwrap_or_default()
}
pub fn has_local_pk(&self) -> bool {
self.flags.local_pk
}
pub fn addrs(&self) -> Option<&AgentAddrs> {
self.addrs.as_ref()
}
pub fn set_addrs(&mut self, external: Option<IpAddr>, internal: Vec<IpAddr>) {
self.addrs = Some(AgentAddrs { external, internal });
}
pub fn map_to_reconcile<F>(&self, f: F) -> PendingAgentReconcile
where
F: Fn(NodeState) -> NodeState,
{
(
self.id(),
self.client_owned(),
match &self.state {
AgentState::Node(id, state) => AgentState::Node(*id, Box::new(f(*state.clone()))),
s => s.clone(),
},
)
}
pub fn filter_map_to_reconcile<F>(&self, f: F) -> Option<PendingAgentReconcile>
where
F: Fn(NodeState) -> Option<NodeState>,
{
Some((
self.id(),
self.client_owned(),
match &self.state {
AgentState::Node(id, state) => AgentState::Node(*id, Box::new(f(*state.clone())?)),
_ => return None,
},
))
}
}
#[derive(Debug, Clone)]
pub enum AgentConnection {
Online(AgentServiceClient),
Offline { since: Instant },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentAddrs {
pub external: Option<IpAddr>,
pub internal: Vec<IpAddr>,
}
impl AgentAddrs {
pub fn usable(&self) -> Option<IpAddr> {
self.external
.as_ref()
.or_else(|| self.internal.first())
.copied()
}
pub fn is_some(&self) -> bool {
self.external.is_some() || !self.internal.is_empty()
}
}