use holochain_types::app::InstalledAppId;
use holochain_types::prelude::*;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tokio::sync::broadcast;
const SIGNAL_BUFFER_SIZE: usize = 64;
#[derive(Debug, Clone)]
pub struct AppBroadcast {
channels: Arc<parking_lot::Mutex<HashMap<InstalledAppId, broadcast::Sender<Signal>>>>,
}
impl AppBroadcast {
pub(crate) fn new() -> Self {
Self {
channels: Arc::new(parking_lot::Mutex::new(HashMap::new())),
}
}
pub(crate) fn create_send_handle(
&self,
installed_app_id: InstalledAppId,
) -> broadcast::Sender<Signal> {
match self.channels.lock().entry(installed_app_id) {
Entry::Occupied(e) => e.get().clone(),
Entry::Vacant(e) => e.insert(broadcast::channel(SIGNAL_BUFFER_SIZE).0).clone(),
}
}
pub(crate) fn subscribe(
&self,
installed_app_id: InstalledAppId,
) -> broadcast::Receiver<Signal> {
match self.channels.lock().entry(installed_app_id) {
Entry::Occupied(e) => e.get().subscribe(),
Entry::Vacant(e) => {
let (tx, rx) = broadcast::channel(SIGNAL_BUFFER_SIZE);
e.insert(tx);
rx
}
}
}
pub(crate) fn retain(&self, installed_apps: HashSet<InstalledAppId>) {
self.channels
.lock()
.retain(|k, _| installed_apps.contains(k));
}
#[cfg(test)]
fn keys(&self) -> Vec<InstalledAppId> {
self.channels.lock().keys().cloned().collect()
}
}
impl Default for AppBroadcast {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use ::fixt::prelude::*;
use hdk::prelude::CellIdFixturator;
use hdk::prelude::ZomeNameFixturator;
use holochain_zome_types::signal::AppSignal;
#[tokio::test]
async fn create_send_handle_and_broadcast() {
let app_broadcast = AppBroadcast::new();
let installed_app_id: InstalledAppId = "test".into();
let tx = app_broadcast.create_send_handle(installed_app_id.clone());
let mut rx = app_broadcast.subscribe(installed_app_id.clone());
let signal = Signal::App {
cell_id: fixt!(CellId),
zome_name: fixt!(ZomeName),
signal: AppSignal::new(ExternIO::from(vec![])),
};
tx.send(signal.clone()).unwrap();
let received_signal = rx.recv().await.unwrap();
assert_eq!(signal, received_signal);
}
#[tokio::test]
async fn subscribe_and_broadcast() {
let app_broadcast = AppBroadcast::new();
let installed_app_id: InstalledAppId = "test".into();
let mut rx = app_broadcast.subscribe(installed_app_id.clone());
let tx = app_broadcast.create_send_handle(installed_app_id.clone());
let signal = Signal::App {
cell_id: fixt!(CellId),
zome_name: fixt!(ZomeName),
signal: AppSignal::new(ExternIO::from(vec![])),
};
tx.send(signal.clone()).unwrap();
let received_signal = rx.recv().await.unwrap();
assert_eq!(signal, received_signal);
}
#[tokio::test]
async fn multiple_senders_and_subscribers() {
let app_broadcast = AppBroadcast::new();
let installed_app_id: InstalledAppId = "test".into();
let tx_1 = app_broadcast.create_send_handle(installed_app_id.clone());
let mut rx_1 = app_broadcast.subscribe(installed_app_id.clone());
let tx_2 = app_broadcast.create_send_handle(installed_app_id.clone());
let mut rx_2 = app_broadcast.subscribe(installed_app_id.clone());
let signal_1 = Signal::App {
cell_id: fixt!(CellId),
zome_name: fixt!(ZomeName),
signal: AppSignal::new(ExternIO::from(vec![])),
};
tx_1.send(signal_1.clone()).unwrap();
let signal_1_rcv_1 = rx_1.recv().await.unwrap();
let signal_1_rcv_2 = rx_2.recv().await.unwrap();
assert_eq!(signal_1, signal_1_rcv_1);
assert_eq!(signal_1, signal_1_rcv_2);
let signal_2 = Signal::App {
cell_id: fixt!(CellId),
zome_name: fixt!(ZomeName),
signal: AppSignal::new(ExternIO::from(vec![])),
};
tx_2.send(signal_2.clone()).unwrap();
let signal_2_rcv_1 = rx_1.recv().await.unwrap();
let signal_2_rcv_2 = rx_2.recv().await.unwrap();
assert_eq!(signal_2, signal_2_rcv_1);
assert_eq!(signal_2, signal_2_rcv_2);
}
#[tokio::test]
async fn clean_up_unused_senders() {
let app_broadcast = AppBroadcast::new();
let installed_app_id_1: InstalledAppId = "test 1".into();
let _tx_1 = app_broadcast.create_send_handle(installed_app_id_1.clone());
let installed_app_id_2: InstalledAppId = "test 2".into();
let _tx_2 = app_broadcast.create_send_handle(installed_app_id_2.clone());
assert_eq!(2, app_broadcast.keys().len());
let mut still_installed = HashSet::new();
still_installed.insert(installed_app_id_1.clone());
app_broadcast.retain(still_installed);
assert_eq!(1, app_broadcast.keys().len());
assert_eq!(vec![installed_app_id_1], app_broadcast.keys());
}
}