bookkeeper_client/meta/
util.rs1use std::collections::HashMap;
2use std::mem::MaybeUninit;
3use std::ptr;
4use std::sync::{Arc, Mutex};
5
6use ignore_result::Ignore;
7use tokio::sync::oneshot;
8use tokio::{select, task};
9
10use crate::client::errors::BkError;
11use crate::client::BookieId;
12use crate::meta::types::{BookieRegistrationClient, BookieServiceInfo, BookieUpdate, BookieUpdateStream};
13
14#[derive(Clone)]
15struct BookieRegistryState {
16 version: i64,
17 writable_bookies: HashMap<BookieId, BookieServiceInfo>,
18 readable_bookies: HashMap<BookieId, BookieServiceInfo>,
19}
20
21#[derive(Clone)]
22pub struct BookieRegistrySnapshot {
23 state: Arc<BookieRegistryState>,
24}
25
26struct BookieRegistryLifetime(MaybeUninit<oneshot::Sender<()>>);
27
28impl Drop for BookieRegistryLifetime {
29 fn drop(&mut self) {
30 let sender = unsafe { ptr::read(self.0.as_ptr()) };
31 sender.send(()).ignore();
32 }
33}
34
35#[derive(Clone)]
36pub struct BookieRegistry {
37 snapshot: Arc<Mutex<BookieRegistrySnapshot>>,
38 _lifetime: Arc<BookieRegistryLifetime>,
39}
40
41impl BookieRegistry {
42 pub fn with_bookies(bookies: &str) -> Result<BookieRegistry, BkError> {
43 let result: Result<HashMap<BookieId, BookieServiceInfo>, BkError> = bookies
44 .split(',')
45 .map(BookieId::new)
46 .map(|bookie_id| match BookieServiceInfo::from_legacy(bookie_id) {
47 Ok(bookie) => Ok((bookie.bookie_id.clone(), bookie)),
48 Err(err) => Err(err),
49 })
50 .collect();
51 let bookies = result?;
52 let (sender, _) = oneshot::channel();
53 let snapshot = BookieRegistrySnapshot {
54 state: Arc::new(BookieRegistryState {
55 version: 0,
56 writable_bookies: bookies,
57 readable_bookies: HashMap::new(),
58 }),
59 };
60 let registry = BookieRegistry {
61 snapshot: Arc::new(Mutex::new(snapshot)),
62 _lifetime: Arc::new(BookieRegistryLifetime(MaybeUninit::new(sender))),
63 };
64 Ok(registry)
65 }
66
67 pub fn snapshot(&self) -> BookieRegistrySnapshot {
68 let state = Self::extract_state(&self.snapshot);
69 BookieRegistrySnapshot { state }
70 }
71
72 pub fn update(&self, snapshot: &mut BookieRegistrySnapshot) {
73 if let Some(state) = self.get_newer_state(snapshot.state.version) {
74 snapshot.state = state;
75 }
76 }
77
78 fn get_newer_state(&self, version: i64) -> Option<Arc<BookieRegistryState>> {
79 let snapshot = self.snapshot.lock().unwrap();
80 if snapshot.state.version > version {
81 return Some(snapshot.state.clone());
82 }
83 None
84 }
85
86 fn extract_state(snapshot: &Mutex<BookieRegistrySnapshot>) -> Arc<BookieRegistryState> {
87 return snapshot.lock().unwrap().state.clone();
88 }
89
90 fn update_bookie(bookies: &mut HashMap<BookieId, BookieServiceInfo>, update: BookieUpdate) {
91 match update {
92 BookieUpdate::Remove(bookie_id) => {
93 bookies.remove(&bookie_id);
94 },
95 BookieUpdate::Add(bookie) => {
96 bookies.insert(bookie.bookie_id.clone(), bookie);
97 },
98 BookieUpdate::Reconstruction(new_bookies) => {
99 let mut new_bookies: HashMap<BookieId, BookieServiceInfo> =
100 new_bookies.into_iter().map(|bookie| (bookie.bookie_id.clone(), bookie)).collect();
101 std::mem::swap(bookies, &mut new_bookies);
102 },
103 };
104 }
105
106 fn start(
107 snapshot: Arc<Mutex<BookieRegistrySnapshot>>,
108 mut receiver: oneshot::Receiver<()>,
109 mut writable_updates: Box<dyn BookieUpdateStream>,
110 mut readable_updates: Box<dyn BookieUpdateStream>,
111 ) {
112 task::spawn(async move {
113 loop {
114 select! {
115 _ = &mut receiver => {
116 break;
117 },
118 update = writable_updates.next() => {
119 let Ok(result) = update else {
120 continue;
121 };
122 let mut state = Self::extract_state(&snapshot).as_ref().clone();
123 state.version += 1;
124 Self::update_bookie(&mut state.writable_bookies, result);
125 let state = Arc::new(state);
126 let mut snapshot = snapshot.lock().unwrap();
127 snapshot.state = state;
128 },
129 update = readable_updates.next() => {
130 let Ok(result) = update else {
131 continue;
132 };
133 let mut state = Self::extract_state(&snapshot).as_ref().clone();
134 state.version += 1;
135 Self::update_bookie(&mut state.readable_bookies, result);
136 let state = Arc::new(state);
137 let mut snapshot = snapshot.lock().unwrap();
138 snapshot.state = state;
139 },
140 }
141 }
142 });
143 }
144
145 pub async fn new<T: BookieRegistrationClient>(bookie_client: &mut T) -> Result<BookieRegistry, BkError> {
146 let (initial_writable_bookies, writable_bookies_stream) = bookie_client.watch_writable_bookies().await?;
147 let (initial_readable_bookies, readable_bookies_stream) = bookie_client.watch_readable_bookies().await?;
148 let state = Arc::new(BookieRegistryState {
149 version: 1,
150 writable_bookies: initial_writable_bookies
151 .into_iter()
152 .map(|bookie| (bookie.bookie_id.clone(), bookie))
153 .collect(),
154 readable_bookies: initial_readable_bookies
155 .into_iter()
156 .map(|bookie| (bookie.bookie_id.clone(), bookie))
157 .collect(),
158 });
159 let (sender, receiver) = oneshot::channel();
160 let snapshot = Arc::new(Mutex::new(BookieRegistrySnapshot { state }));
161 Self::start(snapshot.clone(), receiver, writable_bookies_stream, readable_bookies_stream);
162 Ok(BookieRegistry { snapshot, _lifetime: Arc::new(BookieRegistryLifetime(MaybeUninit::new(sender))) })
163 }
164}
165
166impl BookieRegistrySnapshot {
167 pub fn get_service_info(&self, bookie_id: &str) -> Option<&BookieServiceInfo> {
168 return self.state.writable_bookies.get(bookie_id).or_else(|| self.state.readable_bookies.get(bookie_id));
169 }
170
171 pub fn writable_bookies(&self) -> &HashMap<BookieId, BookieServiceInfo> {
172 &self.state.writable_bookies
173 }
174}