bookkeeper-client 0.2.1

Async rust client for Apache BookKeeper
Documentation
use std::collections::HashMap;
use std::mem::MaybeUninit;
use std::ptr;
use std::sync::{Arc, Mutex};

use ignore_result::Ignore;
use tokio::sync::oneshot;
use tokio::{select, task};

use crate::client::errors::BkError;
use crate::client::BookieId;
use crate::meta::types::{BookieRegistrationClient, BookieServiceInfo, BookieUpdate, BookieUpdateStream};

#[derive(Clone)]
struct BookieRegistryState {
    version: i64,
    writable_bookies: HashMap<BookieId, BookieServiceInfo>,
    readable_bookies: HashMap<BookieId, BookieServiceInfo>,
}

#[derive(Clone)]
pub struct BookieRegistrySnapshot {
    state: Arc<BookieRegistryState>,
}

struct BookieRegistryLifetime(MaybeUninit<oneshot::Sender<()>>);

impl Drop for BookieRegistryLifetime {
    fn drop(&mut self) {
        let sender = unsafe { ptr::read(self.0.as_ptr()) };
        sender.send(()).ignore();
    }
}

#[derive(Clone)]
pub struct BookieRegistry {
    snapshot: Arc<Mutex<BookieRegistrySnapshot>>,
    _lifetime: Arc<BookieRegistryLifetime>,
}

impl BookieRegistry {
    pub fn with_bookies(bookies: &str) -> Result<BookieRegistry, BkError> {
        let result: Result<HashMap<BookieId, BookieServiceInfo>, BkError> = bookies
            .split(',')
            .map(BookieId::new)
            .map(|bookie_id| match BookieServiceInfo::from_legacy(bookie_id) {
                Ok(bookie) => Ok((bookie.bookie_id.clone(), bookie)),
                Err(err) => Err(err),
            })
            .collect();
        let bookies = result?;
        let (sender, _) = oneshot::channel();
        let snapshot = BookieRegistrySnapshot {
            state: Arc::new(BookieRegistryState {
                version: 0,
                writable_bookies: bookies,
                readable_bookies: HashMap::new(),
            }),
        };
        let registry = BookieRegistry {
            snapshot: Arc::new(Mutex::new(snapshot)),
            _lifetime: Arc::new(BookieRegistryLifetime(MaybeUninit::new(sender))),
        };
        Ok(registry)
    }

    pub fn snapshot(&self) -> BookieRegistrySnapshot {
        let state = Self::extract_state(&self.snapshot);
        BookieRegistrySnapshot { state }
    }

    pub fn update(&self, snapshot: &mut BookieRegistrySnapshot) {
        if let Some(state) = self.get_newer_state(snapshot.state.version) {
            snapshot.state = state;
        }
    }

    fn get_newer_state(&self, version: i64) -> Option<Arc<BookieRegistryState>> {
        let snapshot = self.snapshot.lock().unwrap();
        if snapshot.state.version > version {
            return Some(snapshot.state.clone());
        }
        None
    }

    fn extract_state(snapshot: &Mutex<BookieRegistrySnapshot>) -> Arc<BookieRegistryState> {
        return snapshot.lock().unwrap().state.clone();
    }

    fn update_bookie(bookies: &mut HashMap<BookieId, BookieServiceInfo>, update: BookieUpdate) {
        match update {
            BookieUpdate::Remove(bookie_id) => {
                bookies.remove(&bookie_id);
            },
            BookieUpdate::Add(bookie) => {
                bookies.insert(bookie.bookie_id.clone(), bookie);
            },
            BookieUpdate::Reconstruction(new_bookies) => {
                let mut new_bookies: HashMap<BookieId, BookieServiceInfo> =
                    new_bookies.into_iter().map(|bookie| (bookie.bookie_id.clone(), bookie)).collect();
                std::mem::swap(bookies, &mut new_bookies);
            },
        };
    }

    fn start(
        snapshot: Arc<Mutex<BookieRegistrySnapshot>>,
        mut receiver: oneshot::Receiver<()>,
        mut writable_updates: Box<dyn BookieUpdateStream>,
        mut readable_updates: Box<dyn BookieUpdateStream>,
    ) {
        task::spawn(async move {
            loop {
                select! {
                    _ = &mut receiver => {
                        break;
                    },
                    update = writable_updates.next() => {
                        let Ok(result) = update else {
                            continue;
                        };
                        let mut state = Self::extract_state(&snapshot).as_ref().clone();
                        state.version += 1;
                        Self::update_bookie(&mut state.writable_bookies, result);
                        let state = Arc::new(state);
                        let mut snapshot = snapshot.lock().unwrap();
                        snapshot.state = state;
                    },
                    update = readable_updates.next() => {
                        let Ok(result) = update else {
                            continue;
                        };
                        let mut state = Self::extract_state(&snapshot).as_ref().clone();
                        state.version += 1;
                        Self::update_bookie(&mut state.readable_bookies, result);
                        let state = Arc::new(state);
                        let mut snapshot = snapshot.lock().unwrap();
                        snapshot.state = state;
                    },
                }
            }
        });
    }

    pub async fn new<T: BookieRegistrationClient>(bookie_client: &mut T) -> Result<BookieRegistry, BkError> {
        let (initial_writable_bookies, writable_bookies_stream) = bookie_client.watch_writable_bookies().await?;
        let (initial_readable_bookies, readable_bookies_stream) = bookie_client.watch_readable_bookies().await?;
        let state = Arc::new(BookieRegistryState {
            version: 1,
            writable_bookies: initial_writable_bookies
                .into_iter()
                .map(|bookie| (bookie.bookie_id.clone(), bookie))
                .collect(),
            readable_bookies: initial_readable_bookies
                .into_iter()
                .map(|bookie| (bookie.bookie_id.clone(), bookie))
                .collect(),
        });
        let (sender, receiver) = oneshot::channel();
        let snapshot = Arc::new(Mutex::new(BookieRegistrySnapshot { state }));
        Self::start(snapshot.clone(), receiver, writable_bookies_stream, readable_bookies_stream);
        Ok(BookieRegistry { snapshot, _lifetime: Arc::new(BookieRegistryLifetime(MaybeUninit::new(sender))) })
    }
}

impl BookieRegistrySnapshot {
    pub fn get_service_info(&self, bookie_id: &str) -> Option<&BookieServiceInfo> {
        return self.state.writable_bookies.get(bookie_id).or_else(|| self.state.readable_bookies.get(bookie_id));
    }

    pub fn writable_bookies(&self) -> &HashMap<BookieId, BookieServiceInfo> {
        &self.state.writable_bookies
    }
}