linera-core 0.15.17

The core Linera protocol, including client and server logic, node synchronization, etc.
Documentation
// Copyright (c) Zefchain Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::sync::Arc;

use linera_base::identifiers::ChainId;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tracing::trace;

use crate::worker;

// TODO(#2171): replace this with a Tokio broadcast channel

/// A `Notifier` holds references to clients waiting to receive notifications
/// from the validator.
/// Clients will be evicted if their connections are terminated.
pub struct ChannelNotifier<N> {
    inner: papaya::HashMap<ChainId, Vec<UnboundedSender<N>>>,
}

impl<N> Default for ChannelNotifier<N> {
    fn default() -> Self {
        Self {
            inner: papaya::HashMap::default(),
        }
    }
}

impl<N> ChannelNotifier<N> {
    /// Registers a sender for notifications on the given chain IDs.
    pub fn add_sender(&self, chain_ids: Vec<ChainId>, sender: &UnboundedSender<N>) {
        let pinned = self.inner.pin();
        for id in chain_ids {
            pinned.update_or_insert_with(
                id,
                |senders| senders.iter().cloned().chain([sender.clone()]).collect(),
                || vec![sender.clone()],
            );
        }
    }

    /// Creates a subscription given a collection of chain IDs and a sender to the client.
    pub fn subscribe(&self, chain_ids: Vec<ChainId>) -> UnboundedReceiver<N> {
        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
        self.add_sender(chain_ids, &tx);
        rx
    }

    /// Creates a subscription given a collection of chain IDs and a sender to the client.
    /// Immediately posts a first notification as an ACK.
    pub fn subscribe_with_ack(&self, chain_ids: Vec<ChainId>, ack: N) -> UnboundedReceiver<N> {
        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
        self.add_sender(chain_ids, &tx);
        tx.send(ack)
            .expect("pushing to a new channel should succeed");
        rx
    }
}

impl<N> ChannelNotifier<N>
where
    N: Clone,
{
    /// Notifies all the clients waiting for a notification from a given chain.
    pub fn notify_chain(&self, chain_id: &ChainId, notification: &N) {
        let pinned = self.inner.pin();

        // Read senders outside of `compute` to avoid side effects in a
        // retriable closure. papaya's `compute` may call its closure
        // multiple times on CAS contention, so `send()` must not happen
        // inside it.
        let Some(senders) = pinned.get(chain_id).cloned() else {
            trace!("Chain {chain_id} has no subscribers.");
            return;
        };

        // Send notifications (side effect — must happen exactly once).
        let mut has_dead = false;
        for sender in &senders {
            if sender.send(notification.clone()).is_err() {
                has_dead = true;
            }
        }

        // Clean up dead senders. The closure is pure: `is_closed()` is
        // idempotent and has no side effects, so retries are safe.
        if has_dead {
            pinned.compute(*chain_id, |entry| {
                let Some((_key, current_senders)) = entry else {
                    return papaya::Operation::Abort(());
                };
                let live: Vec<_> = current_senders
                    .iter()
                    .filter(|s| !s.is_closed())
                    .cloned()
                    .collect();
                if live.is_empty() {
                    trace!("No more subscribers for chain {chain_id}. Removing entry.");
                    papaya::Operation::Remove
                } else {
                    papaya::Operation::Insert(live)
                }
            });
        }
    }
}

pub trait Notifier: Clone + Send + 'static {
    fn notify(&self, notifications: &[worker::Notification]);
}

impl Notifier for Arc<ChannelNotifier<worker::Notification>> {
    fn notify(&self, notifications: &[worker::Notification]) {
        for notification in notifications {
            self.notify_chain(&notification.chain_id, notification);
        }
    }
}

impl Notifier for () {
    fn notify(&self, _notifications: &[worker::Notification]) {}
}

#[cfg(with_testing)]
impl Notifier for Arc<std::sync::Mutex<Vec<worker::Notification>>> {
    fn notify(&self, notifications: &[worker::Notification]) {
        let mut guard = self.lock().unwrap();
        guard.extend(notifications.iter().cloned())
    }
}

#[cfg(test)]
pub mod tests {
    use std::{
        sync::{atomic::Ordering, Arc},
        time::Duration,
    };

    use linera_execution::test_utils::dummy_chain_description;

    use super::*;

    #[test]
    fn test_concurrent() {
        let notifier = ChannelNotifier::default();

        let chain_a = dummy_chain_description(0).id();
        let chain_b = dummy_chain_description(1).id();

        let a_rec = Arc::new(std::sync::atomic::AtomicUsize::new(0));
        let b_rec = Arc::new(std::sync::atomic::AtomicUsize::new(0));
        let a_b_rec = Arc::new(std::sync::atomic::AtomicUsize::new(0));

        let mut rx_a = notifier.subscribe(vec![chain_a]);
        let mut rx_b = notifier.subscribe(vec![chain_b]);
        let mut rx_a_b = notifier.subscribe(vec![chain_a, chain_b]);

        let a_rec_clone = a_rec.clone();
        let b_rec_clone = b_rec.clone();
        let a_b_rec_clone = a_b_rec.clone();

        let notifier = Arc::new(notifier);

        std::thread::spawn(move || {
            while rx_a.blocking_recv().is_some() {
                a_rec_clone.fetch_add(1, Ordering::Relaxed);
            }
        });

        std::thread::spawn(move || {
            while rx_b.blocking_recv().is_some() {
                b_rec_clone.fetch_add(1, Ordering::Relaxed);
            }
        });

        std::thread::spawn(move || {
            while rx_a_b.blocking_recv().is_some() {
                a_b_rec_clone.fetch_add(1, Ordering::Relaxed);
            }
        });

        const NOTIFICATIONS_A: usize = 500;
        const NOTIFICATIONS_B: usize = 700;

        let a_notifier = notifier.clone();
        let handle_a = std::thread::spawn(move || {
            for _ in 0..NOTIFICATIONS_A {
                a_notifier.notify_chain(&chain_a, &());
            }
        });

        let handle_b = std::thread::spawn(move || {
            for _ in 0..NOTIFICATIONS_B {
                notifier.notify_chain(&chain_b, &());
            }
        });

        // finish sending all the messages
        handle_a.join().unwrap();
        handle_b.join().unwrap();

        // give some time for the messages to be received.
        std::thread::sleep(Duration::from_millis(100));

        assert_eq!(a_rec.load(Ordering::Relaxed), NOTIFICATIONS_A);
        assert_eq!(b_rec.load(Ordering::Relaxed), NOTIFICATIONS_B);
        assert_eq!(
            a_b_rec.load(Ordering::Relaxed),
            NOTIFICATIONS_A + NOTIFICATIONS_B
        );
    }

    #[test]
    fn test_eviction() {
        let notifier = ChannelNotifier::default();

        let chain_a = dummy_chain_description(0).id();
        let chain_b = dummy_chain_description(1).id();
        let chain_c = dummy_chain_description(2).id();
        let chain_d = dummy_chain_description(3).id();

        // Chain A -> Notify A, Notify B
        // Chain B -> Notify A, Notify B
        // Chain C -> Notify C
        // Chain D -> Notify A, Notify B, Notify C, Notify D

        let mut rx_a = notifier.subscribe(vec![chain_a, chain_b, chain_d]);
        let mut rx_b = notifier.subscribe(vec![chain_a, chain_b, chain_d]);
        let mut rx_c = notifier.subscribe(vec![chain_c, chain_d]);
        let mut rx_d = notifier.subscribe(vec![chain_d]);

        assert_eq!(notifier.inner.len(), 4);

        rx_c.close();
        notifier.notify_chain(&chain_c, &());
        assert_eq!(notifier.inner.len(), 3);

        rx_a.close();
        notifier.notify_chain(&chain_a, &());
        assert_eq!(notifier.inner.len(), 3);

        rx_b.close();
        notifier.notify_chain(&chain_b, &());
        assert_eq!(notifier.inner.len(), 2);

        notifier.notify_chain(&chain_a, &());
        assert_eq!(notifier.inner.len(), 1);

        rx_d.close();
        notifier.notify_chain(&chain_d, &());
        assert_eq!(notifier.inner.len(), 0);
    }
}