bookkeeper_client/meta/
util.rs

1use std::collections::HashMap;
2use std::mem::MaybeUninit;
3use std::ptr;
4use std::sync::{Arc, Mutex};
5
6use ignore_result::Ignore;
7use tokio::sync::oneshot;
8use tokio::{select, task};
9
10use crate::client::errors::BkError;
11use crate::client::BookieId;
12use crate::meta::types::{BookieRegistrationClient, BookieServiceInfo, BookieUpdate, BookieUpdateStream};
13
14#[derive(Clone)]
15struct BookieRegistryState {
16    version: i64,
17    writable_bookies: HashMap<BookieId, BookieServiceInfo>,
18    readable_bookies: HashMap<BookieId, BookieServiceInfo>,
19}
20
21#[derive(Clone)]
22pub struct BookieRegistrySnapshot {
23    state: Arc<BookieRegistryState>,
24}
25
26struct BookieRegistryLifetime(MaybeUninit<oneshot::Sender<()>>);
27
28impl Drop for BookieRegistryLifetime {
29    fn drop(&mut self) {
30        let sender = unsafe { ptr::read(self.0.as_ptr()) };
31        sender.send(()).ignore();
32    }
33}
34
35#[derive(Clone)]
36pub struct BookieRegistry {
37    snapshot: Arc<Mutex<BookieRegistrySnapshot>>,
38    _lifetime: Arc<BookieRegistryLifetime>,
39}
40
41impl BookieRegistry {
42    pub fn with_bookies(bookies: &str) -> Result<BookieRegistry, BkError> {
43        let result: Result<HashMap<BookieId, BookieServiceInfo>, BkError> = bookies
44            .split(',')
45            .map(BookieId::new)
46            .map(|bookie_id| match BookieServiceInfo::from_legacy(bookie_id) {
47                Ok(bookie) => Ok((bookie.bookie_id.clone(), bookie)),
48                Err(err) => Err(err),
49            })
50            .collect();
51        let bookies = result?;
52        let (sender, _) = oneshot::channel();
53        let snapshot = BookieRegistrySnapshot {
54            state: Arc::new(BookieRegistryState {
55                version: 0,
56                writable_bookies: bookies,
57                readable_bookies: HashMap::new(),
58            }),
59        };
60        let registry = BookieRegistry {
61            snapshot: Arc::new(Mutex::new(snapshot)),
62            _lifetime: Arc::new(BookieRegistryLifetime(MaybeUninit::new(sender))),
63        };
64        Ok(registry)
65    }
66
67    pub fn snapshot(&self) -> BookieRegistrySnapshot {
68        let state = Self::extract_state(&self.snapshot);
69        BookieRegistrySnapshot { state }
70    }
71
72    pub fn update(&self, snapshot: &mut BookieRegistrySnapshot) {
73        if let Some(state) = self.get_newer_state(snapshot.state.version) {
74            snapshot.state = state;
75        }
76    }
77
78    fn get_newer_state(&self, version: i64) -> Option<Arc<BookieRegistryState>> {
79        let snapshot = self.snapshot.lock().unwrap();
80        if snapshot.state.version > version {
81            return Some(snapshot.state.clone());
82        }
83        None
84    }
85
86    fn extract_state(snapshot: &Mutex<BookieRegistrySnapshot>) -> Arc<BookieRegistryState> {
87        return snapshot.lock().unwrap().state.clone();
88    }
89
90    fn update_bookie(bookies: &mut HashMap<BookieId, BookieServiceInfo>, update: BookieUpdate) {
91        match update {
92            BookieUpdate::Remove(bookie_id) => {
93                bookies.remove(&bookie_id);
94            },
95            BookieUpdate::Add(bookie) => {
96                bookies.insert(bookie.bookie_id.clone(), bookie);
97            },
98            BookieUpdate::Reconstruction(new_bookies) => {
99                let mut new_bookies: HashMap<BookieId, BookieServiceInfo> =
100                    new_bookies.into_iter().map(|bookie| (bookie.bookie_id.clone(), bookie)).collect();
101                std::mem::swap(bookies, &mut new_bookies);
102            },
103        };
104    }
105
106    fn start(
107        snapshot: Arc<Mutex<BookieRegistrySnapshot>>,
108        mut receiver: oneshot::Receiver<()>,
109        mut writable_updates: Box<dyn BookieUpdateStream>,
110        mut readable_updates: Box<dyn BookieUpdateStream>,
111    ) {
112        task::spawn(async move {
113            loop {
114                select! {
115                    _ = &mut receiver => {
116                        break;
117                    },
118                    update = writable_updates.next() => {
119                        let Ok(result) = update else {
120                            continue;
121                        };
122                        let mut state = Self::extract_state(&snapshot).as_ref().clone();
123                        state.version += 1;
124                        Self::update_bookie(&mut state.writable_bookies, result);
125                        let state = Arc::new(state);
126                        let mut snapshot = snapshot.lock().unwrap();
127                        snapshot.state = state;
128                    },
129                    update = readable_updates.next() => {
130                        let Ok(result) = update else {
131                            continue;
132                        };
133                        let mut state = Self::extract_state(&snapshot).as_ref().clone();
134                        state.version += 1;
135                        Self::update_bookie(&mut state.readable_bookies, result);
136                        let state = Arc::new(state);
137                        let mut snapshot = snapshot.lock().unwrap();
138                        snapshot.state = state;
139                    },
140                }
141            }
142        });
143    }
144
145    pub async fn new<T: BookieRegistrationClient>(bookie_client: &mut T) -> Result<BookieRegistry, BkError> {
146        let (initial_writable_bookies, writable_bookies_stream) = bookie_client.watch_writable_bookies().await?;
147        let (initial_readable_bookies, readable_bookies_stream) = bookie_client.watch_readable_bookies().await?;
148        let state = Arc::new(BookieRegistryState {
149            version: 1,
150            writable_bookies: initial_writable_bookies
151                .into_iter()
152                .map(|bookie| (bookie.bookie_id.clone(), bookie))
153                .collect(),
154            readable_bookies: initial_readable_bookies
155                .into_iter()
156                .map(|bookie| (bookie.bookie_id.clone(), bookie))
157                .collect(),
158        });
159        let (sender, receiver) = oneshot::channel();
160        let snapshot = Arc::new(Mutex::new(BookieRegistrySnapshot { state }));
161        Self::start(snapshot.clone(), receiver, writable_bookies_stream, readable_bookies_stream);
162        Ok(BookieRegistry { snapshot, _lifetime: Arc::new(BookieRegistryLifetime(MaybeUninit::new(sender))) })
163    }
164}
165
166impl BookieRegistrySnapshot {
167    pub fn get_service_info(&self, bookie_id: &str) -> Option<&BookieServiceInfo> {
168        return self.state.writable_bookies.get(bookie_id).or_else(|| self.state.readable_bookies.get(bookie_id));
169    }
170
171    pub fn writable_bookies(&self) -> &HashMap<BookieId, BookieServiceInfo> {
172        &self.state.writable_bookies
173    }
174}