use crate::{Delta, Index, ResourceMap, Snapshot};
use bytes::Bytes;
use std::sync::Arc;
use tokio::sync::watch;
pub struct Plane {
index: Index,
snapshot: Arc<Snapshot>,
notify: watch::Sender<()>,
}
impl Plane {
pub fn new() -> (Self, watch::Receiver<()>) {
let (notify, rx) = watch::channel(());
let plane = Self {
index: Index::new(),
snapshot: Arc::new(Snapshot::new()),
notify,
};
(plane, rx)
}
pub fn snapshot(&self) -> Arc<Snapshot> {
Arc::clone(&self.snapshot)
}
pub fn apply(&mut self, name: &str, bytes: Bytes) -> bool {
if !self.index.update(name, &bytes) {
return false;
}
let mut next = (**self.snapshot.load()).clone();
next.insert(name.to_owned(), bytes);
self.snapshot.store(Arc::new(next));
self.notify.send(()).ok();
true
}
pub fn remove(&mut self, name: &str) -> bool {
if !self.index.remove(name) {
return false;
}
let mut next = (**self.snapshot.load()).clone();
next.remove(name);
self.snapshot.store(Arc::new(next));
self.notify.send(()).ok();
true
}
pub fn reconcile(&mut self, map: ResourceMap) -> Delta {
let delta = self.index.reconcile(&map);
if !delta.is_empty() {
self.snapshot.store(Arc::new(map));
self.notify.send(()).ok();
}
delta
}
}
#[mutants::skip]
fn _assert_plane_is_send() {
fn check<T: Send>() {}
check::<Plane>();
}
#[cfg(test)]
mod tests {
use super::*;
fn bytes(s: &'static str) -> Bytes {
Bytes::from_static(s.as_bytes())
}
#[test]
fn apply_new_resource_visible_in_snapshot() {
let (mut plane, _rx) = Plane::new();
plane.apply("svc-a", bytes("v1"));
let guard = plane.snapshot().load();
assert_eq!(guard.get("svc-a").unwrap(), &bytes("v1"));
}
#[test]
fn apply_same_content_is_noop() {
let (mut plane, _rx) = Plane::new();
plane.apply("svc-a", bytes("v1"));
assert!(!plane.apply("svc-a", bytes("v1")));
}
#[test]
fn apply_mutated_content_updates_snapshot() {
let (mut plane, _rx) = Plane::new();
plane.apply("svc-a", bytes("v1"));
plane.apply("svc-a", bytes("v2"));
assert_eq!(
plane.snapshot().load().get("svc-a").unwrap(),
&bytes("v2"),
);
}
#[test]
fn remove_existing_drops_from_snapshot() {
let (mut plane, _rx) = Plane::new();
plane.apply("svc-a", bytes("v1"));
assert!(plane.remove("svc-a"));
assert!(plane.snapshot().load().get("svc-a").is_none());
}
#[test]
fn remove_nonexistent_is_noop() {
let (mut plane, _rx) = Plane::new();
assert!(!plane.remove("svc-a"));
}
#[test]
fn remove_then_apply_same_bytes_is_changed() {
let (mut plane, _rx) = Plane::new();
plane.apply("svc-a", bytes("v1"));
plane.remove("svc-a");
assert!(plane.apply("svc-a", bytes("v1")));
}
#[test]
fn reconcile_replaces_snapshot() {
let (mut plane, _rx) = Plane::new();
let mut map = ResourceMap::new();
map.insert("svc-a".to_owned(), bytes("v1"));
map.insert("svc-b".to_owned(), bytes("v2"));
plane.reconcile(map);
let guard = plane.snapshot().load();
assert_eq!(guard.get("svc-a").unwrap(), &bytes("v1"));
assert_eq!(guard.get("svc-b").unwrap(), &bytes("v2"));
}
#[test]
fn reconcile_unchanged_returns_empty_delta() {
let (mut plane, _rx) = Plane::new();
let mut map = ResourceMap::new();
map.insert("svc-a".to_owned(), bytes("v1"));
plane.reconcile(map.clone());
let delta = plane.reconcile(map);
assert!(delta.is_empty());
}
#[test]
fn apply_fires_watch() {
let (mut plane, rx) = Plane::new();
plane.apply("svc-a", bytes("v1"));
assert!(rx.has_changed().unwrap());
}
#[test]
fn unchanged_apply_does_not_fire_watch() {
let (mut plane, mut rx) = Plane::new();
plane.apply("svc-a", bytes("v1"));
let _ = rx.borrow_and_update();
plane.apply("svc-a", bytes("v1"));
assert!(!rx.has_changed().unwrap());
}
#[test]
fn remove_fires_watch() {
let (mut plane, mut rx) = Plane::new();
plane.apply("svc-a", bytes("v1"));
let _ = rx.borrow_and_update();
plane.remove("svc-a");
assert!(rx.has_changed().unwrap());
}
#[test]
fn reconcile_with_changes_fires_watch() {
let (mut plane, mut rx) = Plane::new();
let mut map = ResourceMap::new();
map.insert("svc-a".to_owned(), bytes("v1"));
plane.reconcile(map.clone());
let _ = rx.borrow_and_update();
map.insert("svc-a".to_owned(), bytes("v2"));
plane.reconcile(map);
assert!(rx.has_changed().unwrap());
}
#[test]
fn reconcile_without_changes_is_silent() {
let (mut plane, mut rx) = Plane::new();
let mut map = ResourceMap::new();
map.insert("svc-a".to_owned(), bytes("v1"));
plane.reconcile(map.clone());
let _ = rx.borrow_and_update();
plane.reconcile(map);
assert!(!rx.has_changed().unwrap());
}
#[test]
fn snapshot_handle_sees_updates_after_issue() {
let (mut plane, _rx) = Plane::new();
let snap = plane.snapshot();
plane.apply("svc-a", bytes("v1"));
assert_eq!(snap.load().get("svc-a").unwrap(), &bytes("v1"));
plane.apply("svc-a", bytes("v2"));
assert_eq!(snap.load().get("svc-a").unwrap(), &bytes("v2"));
}
}