netidx/resolver_server/
shard_store.rs

1use super::{
2    auth::{Permissions, UserInfo},
3    secctx::{SecCtx, SecCtxDataReadGuard},
4    store::{self, COLS_POOL, MAX_READ_BATCH, MAX_WRITE_BATCH, PATH_POOL, REF_POOL},
5};
6use crate::{
7    channel::Channel,
8    pack::Z64,
9    path::Path,
10    protocol::{
11        glob::Scope,
12        resolver::{
13            FromRead, FromWrite, GetChangeNr, ListMatching, Publisher, PublisherId,
14            Referral, Resolved, Table, ToRead, ToWrite,
15        },
16    },
17};
18use anyhow::Result;
19use chrono::prelude::*;
20use futures::{
21    channel::{
22        mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
23        oneshot::{self, Canceled},
24    },
25    future::join_all,
26    prelude::*,
27    select,
28};
29use fxhash::FxHashMap;
30use log::{info, trace};
31use poolshark::global::{GPooled, Pool};
32use std::{
33    collections::{hash_map::DefaultHasher, BTreeMap, HashMap, HashSet, VecDeque},
34    hash::{Hash, Hasher},
35    mem,
36    net::SocketAddr,
37    ops::Deref,
38    result,
39    sync::{Arc, LazyLock},
40    time::SystemTime,
41};
42use tokio::task;
43
44type ReadB = Vec<(u64, ToRead)>;
45type ReadR = VecDeque<(u64, FromRead)>;
46type WriteB = Vec<(u64, ToWrite)>;
47type WriteR = Vec<(u64, FromWrite)>;
48
49static PUBLISHERS_POOL: LazyLock<Pool<FxHashMap<PublisherId, Publisher>>> =
50    LazyLock::new(|| Pool::new(100, 1000));
51static TO_READ_POOL: LazyLock<Pool<ReadB>> = LazyLock::new(|| Pool::new(100, 10_000));
52static FROM_READ_POOL: LazyLock<Pool<ReadR>> = LazyLock::new(|| Pool::new(100, 10_000));
53static TO_WRITE_POOL: LazyLock<Pool<WriteB>> = LazyLock::new(|| Pool::new(100, 10_000));
54static REPLIES: LazyLock<Pool<Vec<GPooled<ReadR>>>> =
55    LazyLock::new(|| Pool::new(10, 1024));
56static FROM_WRITE_POOL: LazyLock<Pool<WriteR>> = LazyLock::new(|| Pool::new(100, 10_000));
57static COLS_HPOOL: LazyLock<Pool<HashMap<Path, Z64>>> =
58    LazyLock::new(|| Pool::new(32, 10_000));
59static PATH_HPOOL: LazyLock<Pool<HashSet<Path>>> =
60    LazyLock::new(|| Pool::new(32, 10_000));
61static PATH_BPOOL: LazyLock<Pool<Vec<GPooled<Vec<Path>>>>> =
62    LazyLock::new(|| Pool::new(32, 1024));
63static READ_SHARD_BATCH: LazyLock<Pool<Vec<GPooled<ReadB>>>> =
64    LazyLock::new(|| Pool::new(100, 1024));
65static WRITE_SHARD_BATCH: LazyLock<Pool<Vec<GPooled<WriteB>>>> =
66    LazyLock::new(|| Pool::new(100, 1024));
67
68struct ReadRequest {
69    uifo: Arc<UserInfo>,
70    batch: GPooled<ReadB>,
71}
72
73struct ReadResponse {
74    publishers: GPooled<FxHashMap<PublisherId, Publisher>>,
75    batch: GPooled<ReadR>,
76}
77
78struct WriteRequest {
79    uifo: Arc<UserInfo>,
80    publisher: Arc<Publisher>,
81    batch: GPooled<WriteB>,
82}
83
84#[derive(Clone)]
85struct Shard {
86    read: UnboundedSender<(ReadRequest, oneshot::Sender<ReadResponse>)>,
87    write: UnboundedSender<(WriteRequest, oneshot::Sender<GPooled<WriteR>>)>,
88    internal: UnboundedSender<(PublisherId, oneshot::Sender<HashSet<Path>>)>,
89}
90
91impl Shard {
92    fn new(
93        shard: usize,
94        parent: Option<Referral>,
95        children: BTreeMap<Path, Referral>,
96        secctx: SecCtx,
97        resolver: SocketAddr,
98    ) -> Self {
99        let (read, read_rx) = unbounded();
100        let (write, write_rx) = unbounded();
101        let (internal, mut internal_rx) = unbounded();
102        let mut read_rx = read_rx.fuse();
103        let mut write_rx = write_rx.fuse();
104        let t = Shard { read, write, internal };
105        task::spawn(async move {
106            let mut last_shrink = Utc::now();
107            let mut store = store::Store::new(parent, children);
108            loop {
109                select! {
110                    batch = read_rx.next() => match batch {
111                        None => break,
112                        Some((req, reply)) => {
113                let secctx = secctx.read().await;
114                            let r = Shard::process_read_batch(
115                                shard,
116                                &mut store,
117                                &secctx,
118                                resolver,
119                                req
120                            ).await;
121                            let _ = reply.send(r);
122                        }
123                    },
124                    batch = write_rx.next() => match batch {
125                        None => break,
126                        Some((req, reply)) => {
127                let secctx = secctx.read().await;
128                            let r = Shard::process_write_batch(
129                                &mut store,
130                                &secctx,
131                                req
132                            ).await;
133                            let _ = reply.send(r);
134                        }
135                    },
136                    id = internal_rx.next() => match id {
137                        None => break,
138                        Some((id, reply)) => {
139                            let _ = reply.send(store.published_for_id(&id));
140                        }
141                    }
142                }
143                let now = Utc::now();
144                if now - last_shrink > chrono::Duration::hours(1) {
145                    last_shrink = now;
146                    store.shrink_to_fit()
147                }
148            }
149            info!("shard loop finished")
150        });
151        t
152    }
153
154    async fn process_read_batch<'a>(
155        shard: usize,
156        store: &mut store::Store,
157        secctx: &SecCtxDataReadGuard<'a>,
158        resolver: SocketAddr,
159        mut req: ReadRequest,
160    ) -> ReadResponse {
161        // things would need to be massively screwed for this to fail
162        let now =
163            SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
164        let mut resp = ReadResponse {
165            publishers: PUBLISHERS_POOL.take(),
166            batch: FROM_READ_POOL.take(),
167        };
168        let uifo = req.uifo;
169        let pmap = secctx.pmap();
170        let mut n = 0;
171        for (id, m) in req.batch.drain(..) {
172            if n > 10_000 {
173                n = 0;
174                task::yield_now().await
175            }
176            resp.batch.push_back(match m {
177                ToRead::Resolve(path) => {
178                    n += 1;
179                    if let Some(r) = store.check_referral(&path) {
180                        (id, FromRead::Referral(r))
181                    } else {
182                        match pmap {
183                            None => {
184                                let (flags, publishers) =
185                                    store.resolve(&mut resp.publishers, &path);
186                                let a = Resolved {
187                                    resolver,
188                                    publishers,
189                                    timestamp: now,
190                                    permissions: Permissions::all().bits(),
191                                    flags,
192                                };
193                                (id, FromRead::Resolved(a))
194                            }
195                            Some(pmap) => {
196                                let perm = pmap.permissions(&*path, &*uifo);
197                                if !perm.contains(Permissions::SUBSCRIBE) {
198                                    (id, FromRead::Denied)
199                                } else {
200                                    let (flags, publishers) = store.resolve_and_sign(
201                                        &mut resp.publishers,
202                                        &secctx,
203                                        &uifo,
204                                        now,
205                                        perm,
206                                        &path,
207                                    );
208                                    let a = Resolved {
209                                        resolver,
210                                        publishers,
211                                        timestamp: now,
212                                        permissions: perm.bits(),
213                                        flags,
214                                    };
215                                    (id, FromRead::Resolved(a))
216                                }
217                            }
218                        }
219                    }
220                }
221                ToRead::List(path) => {
222                    n += 10;
223                    if let Some(r) = store.check_referral(&path) {
224                        (id, FromRead::Referral(r))
225                    } else {
226                        let allowed = pmap
227                            .map(|pmap| pmap.allowed(&*path, Permissions::LIST, &*uifo))
228                            .unwrap_or(true);
229                        if allowed {
230                            (id, FromRead::List(store.list(&path)))
231                        } else {
232                            (id, FromRead::Denied)
233                        }
234                    }
235                }
236                ToRead::ListMatching(set) => {
237                    n += 1000;
238                    let mut referrals = REF_POOL.take();
239                    if shard == 0 {
240                        for glob in set.iter() {
241                            store.referrals_in_scope(
242                                &mut *referrals,
243                                glob.base(),
244                                glob.scope(),
245                            )
246                        }
247                    }
248                    let allowed = pmap
249                        .map(|pmap| {
250                            set.iter().all(|g| {
251                                pmap.allowed_in_scope(
252                                    g.base(),
253                                    g.scope(),
254                                    Permissions::LIST,
255                                    &*uifo,
256                                )
257                            })
258                        })
259                        .unwrap_or(true);
260                    if !allowed {
261                        let lm = ListMatching { referrals, matched: PATH_BPOOL.take() };
262                        (id, FromRead::ListMatching(lm))
263                    } else {
264                        let mut matched = PATH_BPOOL.take();
265                        matched.push(store.list_matching(&set));
266                        let lm = ListMatching { referrals, matched };
267                        (id, FromRead::ListMatching(lm))
268                    }
269                }
270                ToRead::GetChangeNr(path) => {
271                    n += 1;
272                    let mut referrals = REF_POOL.take();
273                    if shard == 0 {
274                        store.referrals_in_scope(&mut referrals, &*path, &Scope::Subtree);
275                    }
276                    let allowed = pmap
277                        .map(|pmap| pmap.allowed(&*path, Permissions::LIST, &*uifo))
278                        .unwrap_or(true);
279                    if !allowed {
280                        let change_number = Z64(0);
281                        let cn = GetChangeNr { change_number, referrals, resolver };
282                        (id, FromRead::GetChangeNr(cn))
283                    } else {
284                        let change_number = store.get_change_nr(&path);
285                        let cn = GetChangeNr { change_number, referrals, resolver };
286                        (id, FromRead::GetChangeNr(cn))
287                    }
288                }
289                ToRead::Table(path) => {
290                    n += 10;
291                    if let Some(r) = store.check_referral(&path) {
292                        (id, FromRead::Referral(r))
293                    } else {
294                        let allowed = pmap
295                            .map(|pmap| pmap.allowed(&*path, Permissions::LIST, &*uifo))
296                            .unwrap_or(true);
297                        if !allowed {
298                            (id, FromRead::Denied)
299                        } else {
300                            let rows = store.list(&path);
301                            let cols = store.columns(&path);
302                            (id, FromRead::Table(Table { rows, cols }))
303                        }
304                    }
305                }
306            })
307        }
308        resp
309    }
310
311    async fn process_write_batch<'a>(
312        store: &mut store::Store,
313        secctx: &SecCtxDataReadGuard<'a>,
314        mut req: WriteRequest,
315    ) -> GPooled<WriteR> {
316        let uifo = &*req.uifo;
317        let publisher = req.publisher;
318        let pmap = secctx.pmap();
319        let publish = |s: &mut store::Store,
320                       path: Path,
321                       default: bool,
322                       flags: Option<u32>|
323         -> FromWrite {
324            if !Path::is_absolute(&*path) {
325                FromWrite::Error("absolute paths required".into())
326            } else if let Some(r) = s.check_referral(&path) {
327                FromWrite::Referral(r)
328            } else {
329                let perm = if default {
330                    Permissions::PUBLISH_DEFAULT
331                } else {
332                    Permissions::PUBLISH
333                };
334                if pmap.map(|p| p.allowed(&*path, perm, uifo)).unwrap_or(true) {
335                    s.publish(path, &publisher, default, flags);
336                    FromWrite::Published
337                } else {
338                    FromWrite::Denied
339                }
340            }
341        };
342        let mut resp = FROM_WRITE_POOL.take();
343        let mut n = 0;
344        for (id, m) in req.batch.drain(..) {
345            if n > 5_000 {
346                n = 0;
347                task::yield_now().await;
348            }
349            resp.push(match m {
350                ToWrite::Heartbeat => unreachable!(),
351                ToWrite::Clear => {
352                    n += 1000;
353                    store.clear(&publisher);
354                    (id, FromWrite::Unpublished)
355                }
356                ToWrite::Publish(path) => {
357                    n += 1;
358                    (id, publish(store, path, false, None))
359                }
360                ToWrite::PublishDefault(path) => {
361                    n += 1;
362                    (id, publish(store, path, true, None))
363                }
364                ToWrite::PublishWithFlags(path, flags) => {
365                    n += 1;
366                    (id, publish(store, path, false, Some(flags)))
367                }
368                ToWrite::PublishDefaultWithFlags(path, flags) => {
369                    n += 1;
370                    (id, publish(store, path, true, Some(flags)))
371                }
372                ToWrite::Unpublish(path) => {
373                    n += 5;
374                    if !Path::is_absolute(&*path) {
375                        (id, FromWrite::Error("absolute paths required".into()))
376                    } else if let Some(r) = store.check_referral(&path) {
377                        (id, FromWrite::Referral(r))
378                    } else {
379                        store.unpublish(&publisher, false, path);
380                        (id, FromWrite::Unpublished)
381                    }
382                }
383                ToWrite::UnpublishDefault(path) => {
384                    n += 5;
385                    if !Path::is_absolute(&*path) {
386                        (id, FromWrite::Error("absolute paths required".into()))
387                    } else if let Some(r) = store.check_referral(&path) {
388                        (id, FromWrite::Referral(r))
389                    } else {
390                        store.unpublish(&publisher, true, path);
391                        (id, FromWrite::Unpublished)
392                    }
393                }
394            })
395        }
396        resp
397    }
398}
399
400macro_rules! same {
401    ($con:expr, $replies:expr, $res:expr, $msg:expr) => {
402        for i in 1..$replies.len() {
403            let (_, v) = $replies[i].pop_front().unwrap();
404            if &v != $res {
405                panic!($msg);
406            }
407        }
408        $con.queue_send($res)?;
409    };
410}
411
412struct QueuedWrite {
413    uifo: Arc<UserInfo>,
414    publisher: Arc<Publisher>,
415    msgs: GPooled<Vec<ToWrite>>,
416    result: oneshot::Sender<Result<GPooled<Vec<(u64, FromWrite)>>>>,
417}
418
419pub(super) struct StoreInner {
420    shards: Vec<Shard>,
421    shard_mask: usize,
422    tx_write: UnboundedSender<QueuedWrite>,
423}
424
425#[derive(Clone)]
426pub(super) struct Store(Arc<StoreInner>);
427
428impl Deref for Store {
429    type Target = StoreInner;
430
431    fn deref(&self) -> &Self::Target {
432        &self.0
433    }
434}
435
436impl Store {
437    pub(super) fn new(
438        parent: Option<Referral>,
439        children: BTreeMap<Path, Referral>,
440        secctx: SecCtx,
441        resolver: SocketAddr,
442    ) -> Self {
443        let shards = std::cmp::max(1, num_cpus::get().next_power_of_two());
444        let shard_mask = shards - 1;
445        let shards = (0..shards)
446            .into_iter()
447            .map(|i| {
448                Shard::new(i, parent.clone(), children.clone(), secctx.clone(), resolver)
449            })
450            .collect();
451        let (tx_write, rx_write) = unbounded();
452        let t = Store(Arc::new(StoreInner { shards, shard_mask, tx_write }));
453        task::spawn({
454            let t = t.clone();
455            async { t.write_task(rx_write).await }
456        });
457        t
458    }
459
460    async fn handle_queued_write(
461        &self,
462        uifo: Arc<UserInfo>,
463        publisher: Arc<Publisher>,
464        mut msgs: impl Iterator<Item = ToWrite>,
465    ) -> Result<GPooled<Vec<(u64, FromWrite)>>> {
466        trace!("handling write from {:?}", &publisher);
467        let mut finished = false;
468        let mut n = 0;
469        let mut replies = FROM_WRITE_POOL.take();
470        loop {
471            let mut by_shard = self.write_shard_batch();
472            for _ in 0..MAX_WRITE_BATCH {
473                match msgs.next() {
474                    None => {
475                        finished = true;
476                        break;
477                    }
478                    Some(ToWrite::Heartbeat) => continue,
479                    Some(ToWrite::Clear) => {
480                        for b in by_shard.iter_mut() {
481                            b.push((n, ToWrite::Clear));
482                        }
483                    }
484                    Some(ToWrite::Publish(path)) => {
485                        let s = self.shard(&path);
486                        by_shard[s].push((n, ToWrite::Publish(path)));
487                    }
488                    Some(ToWrite::Unpublish(path)) => {
489                        let s = self.shard(&path);
490                        by_shard[s].push((n, ToWrite::Unpublish(path)));
491                    }
492                    Some(ToWrite::UnpublishDefault(path)) => {
493                        for b in by_shard.iter_mut() {
494                            b.push((n, ToWrite::UnpublishDefault(path.clone())));
495                        }
496                    }
497                    Some(ToWrite::PublishDefault(path)) => {
498                        for b in by_shard.iter_mut() {
499                            b.push((n, ToWrite::PublishDefault(path.clone())));
500                        }
501                    }
502                    Some(ToWrite::PublishWithFlags(path, flags)) => {
503                        let s = self.shard(&path);
504                        by_shard[s].push((n, ToWrite::PublishWithFlags(path, flags)));
505                    }
506                    Some(ToWrite::PublishDefaultWithFlags(path, flags)) => {
507                        for b in by_shard.iter_mut() {
508                            b.push((
509                                n,
510                                ToWrite::PublishDefaultWithFlags(path.clone(), flags),
511                            ));
512                        }
513                    }
514                }
515                n += 1;
516            }
517            trace!("handle_write_batch dispatching {} messages to shards", n);
518            if by_shard.iter().all(|v| v.is_empty()) {
519                assert!(finished);
520                break;
521            }
522            let mut r = join_all(by_shard.drain(..).enumerate().map(|(i, batch)| {
523                let (tx, rx) = oneshot::channel();
524                let publisher = publisher.clone();
525                let req = WriteRequest { uifo: uifo.clone(), publisher, batch };
526                let _ = self.shards[i].write.unbounded_send((req, tx));
527                rx
528            }))
529            .await
530            .into_iter()
531            .collect::<result::Result<Vec<GPooled<WriteR>>, Canceled>>()?;
532            for mut r in r.drain(..) {
533                replies.extend(r.drain(..))
534            }
535        }
536        Ok(replies)
537    }
538
539    async fn write_task(self, mut rx: UnboundedReceiver<QueuedWrite>) {
540        while let Some(mut w) = rx.next().await {
541            let msgs = w.msgs.drain(..);
542            let res = self.handle_queued_write(w.uifo, w.publisher, msgs).await;
543            let _ = w.result.send(res);
544        }
545        info!("write task shutting down")
546    }
547
548    fn shard(&self, path: &Path) -> usize {
549        let mut hasher = DefaultHasher::new();
550        path.hash(&mut hasher);
551        hasher.finish() as usize & self.shard_mask
552    }
553
554    fn read_shard_batch(&self) -> GPooled<Vec<GPooled<ReadB>>> {
555        let mut b = READ_SHARD_BATCH.take();
556        b.extend((0..self.shards.len()).into_iter().map(|_| TO_READ_POOL.take()));
557        b
558    }
559
560    fn write_shard_batch(&self) -> GPooled<Vec<GPooled<WriteB>>> {
561        let mut b = WRITE_SHARD_BATCH.take();
562        b.extend((0..self.shards.len()).into_iter().map(|_| TO_WRITE_POOL.take()));
563        b
564    }
565
566    pub(super) async fn handle_batch_read(
567        &self,
568        con: &mut Channel,
569        uifo: Arc<UserInfo>,
570        mut msgs: impl Iterator<Item = ToRead>,
571    ) -> Result<()> {
572        let mut finished = false;
573        loop {
574            let mut n = 0;
575            let mut c = 0;
576            let mut by_shard = self.read_shard_batch();
577            while c < MAX_READ_BATCH {
578                match msgs.next() {
579                    None => {
580                        finished = true;
581                        break;
582                    }
583                    Some(ToRead::Resolve(path)) => {
584                        let s = self.shard(&path);
585                        by_shard[s].push((n, ToRead::Resolve(path)));
586                        c += 1;
587                    }
588                    Some(ToRead::GetChangeNr(path)) => {
589                        for b in by_shard.iter_mut() {
590                            b.push((n, ToRead::GetChangeNr(path.clone())));
591                        }
592                        c += 1;
593                    }
594                    Some(ToRead::List(path)) => {
595                        for b in by_shard.iter_mut() {
596                            b.push((n, ToRead::List(path.clone())));
597                        }
598                        c += 10000;
599                    }
600                    Some(ToRead::Table(path)) => {
601                        for b in by_shard.iter_mut() {
602                            b.push((n, ToRead::Table(path.clone())));
603                        }
604                        c += 10000;
605                    }
606                    Some(ToRead::ListMatching(set)) => {
607                        for b in by_shard.iter_mut() {
608                            b.push((n, ToRead::ListMatching(set.clone())));
609                        }
610                        c += 100000;
611                    }
612                }
613                n += 1;
614            }
615            if by_shard.iter().all(|v| v.is_empty()) {
616                assert!(finished);
617                break Ok(());
618            }
619            let mut replies =
620                join_all(by_shard.drain(..).enumerate().map(|(i, batch)| {
621                    let (tx, rx) = oneshot::channel();
622                    let req = ReadRequest { uifo: uifo.clone(), batch };
623                    let _ = self.shards[i].read.unbounded_send((req, tx));
624                    rx
625                }))
626                .await
627                .into_iter()
628                .collect::<result::Result<Vec<ReadResponse>, Canceled>>()?;
629            let mut publishers = PUBLISHERS_POOL.take();
630            for r in replies.iter_mut() {
631                publishers.extend(r.publishers.drain());
632            }
633            for (_, p) in publishers.drain() {
634                con.queue_send(&FromRead::Publisher(p))?;
635            }
636            let mut replies = {
637                let mut r = REPLIES.take();
638                r.extend(replies.into_iter().map(|r| r.batch));
639                r
640            };
641            for i in 0..n {
642                if replies.len() == 1
643                    || !replies
644                        .iter()
645                        .all(|v| v.front().map(|v| i == v.0).unwrap_or(false))
646                {
647                    let r = replies
648                        .iter_mut()
649                        .find_map(|r| {
650                            if r.front().map(|v| v.0 == i).unwrap_or(false) {
651                                r.pop_front()
652                            } else {
653                                None
654                            }
655                        })
656                        .unwrap()
657                        .1;
658                    con.queue_send(&r)?;
659                } else {
660                    match replies[0].pop_front().unwrap() {
661                        (_, FromRead::Publisher(_)) => unreachable!(),
662                        (_, FromRead::Resolved(_)) => unreachable!(),
663                        (_, m @ FromRead::Referral(_)) => {
664                            same!(con, replies, &m, "desynced referral");
665                        }
666                        (_, m @ FromRead::Denied) => {
667                            same!(con, replies, &m, "desynced permissions");
668                        }
669                        (_, FromRead::Error(e)) => {
670                            for i in 1..replies.len() {
671                                replies[i].pop_front().unwrap();
672                            }
673                            con.queue_send(&FromRead::Error(e))?;
674                        }
675                        (_, FromRead::List(mut paths)) => {
676                            let mut hpaths = PATH_HPOOL.take();
677                            hpaths.extend(paths.drain(..));
678                            for i in 1..replies.len() {
679                                if let (_, FromRead::List(mut p)) =
680                                    replies[i].pop_front().unwrap()
681                                {
682                                    hpaths.extend(p.drain(..));
683                                } else {
684                                    panic!("desynced list")
685                                }
686                            }
687                            let mut paths = PATH_POOL.take();
688                            paths.extend(hpaths.drain());
689                            con.queue_send(&FromRead::List(paths))?;
690                        }
691                        (_, FromRead::ListMatching(mut lm)) => {
692                            let referrals = lm.referrals;
693                            let mut matched = PATH_BPOOL.take();
694                            matched.extend(lm.matched.drain(..));
695                            for i in 1..replies.len() {
696                                if let (_, FromRead::ListMatching(mut lm)) =
697                                    replies[i].pop_front().unwrap()
698                                {
699                                    matched.extend(lm.matched.drain(..));
700                                } else {
701                                    panic!("desynced listmatching")
702                                }
703                            }
704                            con.queue_send(&FromRead::ListMatching(ListMatching {
705                                matched,
706                                referrals,
707                            }))?;
708                        }
709                        (_, FromRead::GetChangeNr(cn)) => {
710                            let referrals = cn.referrals;
711                            let resolver = cn.resolver;
712                            let mut change_number = cn.change_number;
713                            for i in 1..replies.len() {
714                                if let (_, FromRead::GetChangeNr(cn)) =
715                                    replies[i].pop_front().unwrap()
716                                {
717                                    *change_number += *cn.change_number;
718                                } else {
719                                    panic!("desynced getchangenumber")
720                                }
721                            }
722                            con.queue_send(&FromRead::GetChangeNr(GetChangeNr {
723                                referrals,
724                                resolver,
725                                change_number,
726                            }))?;
727                        }
728                        (_, FromRead::Table(Table { mut rows, mut cols })) => {
729                            let mut hrows = PATH_HPOOL.take();
730                            let mut hcols = COLS_HPOOL.take();
731                            hrows.extend(rows.drain(..));
732                            hcols.extend(cols.drain(..));
733                            for i in 1..replies.len() {
734                                if let (
735                                    _,
736                                    FromRead::Table(Table { rows: mut rs, cols: mut cs }),
737                                ) = replies[i].pop_front().unwrap()
738                                {
739                                    hrows.extend(rs.drain(..));
740                                    for (p, c) in cs.drain(..) {
741                                        hcols.entry(p).or_insert(Z64(0)).0 += c.0;
742                                    }
743                                } else {
744                                    panic!("desynced table")
745                                }
746                            }
747                            let mut rows = PATH_POOL.take();
748                            let mut cols = COLS_POOL.take();
749                            rows.extend(hrows.drain());
750                            cols.extend(hcols.drain());
751                            con.queue_send(&FromRead::Table(Table { rows, cols }))?;
752                        }
753                    }
754                }
755            }
756            con.flush().await?;
757            if finished {
758                break Ok(());
759            }
760        }
761    }
762
763    pub(super) async fn handle_batch_write(
764        &self,
765        mut con: Option<&mut Channel>,
766        uifo: Arc<UserInfo>,
767        publisher: Arc<Publisher>,
768        msgs: GPooled<Vec<ToWrite>>,
769    ) -> Result<()> {
770        let (tx, rx) = oneshot::channel();
771        self.tx_write.unbounded_send(QueuedWrite {
772            uifo: uifo.clone(),
773            publisher: publisher.clone(),
774            msgs,
775            result: tx,
776        })?;
777        let mut replies = rx.await??;
778        replies.sort_unstable_by_key(|(n, _)| *n);
779        trace!("handle_write_batch {} replies", replies.len());
780        if let Some(c) = con.as_mut()
781            && let Some((mut n, mut cur)) = replies.pop()
782        {
783            for (i, m) in replies.drain(..) {
784                if i == n {
785                    if mem::discriminant(&cur) != mem::discriminant(&m) {
786                        panic!("desynced message {cur:?} vs {m:?}")
787                    }
788                } else {
789                    n = i;
790                    c.queue_send(&cur)?;
791                    cur = m;
792                }
793            }
794            c.queue_send(&cur)?;
795            c.flush().await?;
796        }
797        trace!("handle_write_batch processed replies");
798        Ok(())
799    }
800
801    pub(super) async fn handle_clear(
802        &self,
803        uifo: Arc<UserInfo>,
804        publisher: Arc<Publisher>,
805    ) -> Result<()> {
806        use rand::{rng, seq::SliceRandom};
807        trace!("clearing publisher {:?}", &publisher);
808        let mut published_paths = join_all(self.shards.iter().map(|shard| {
809            let (tx, rx) = oneshot::channel();
810            let _ = shard.internal.unbounded_send((publisher.id, tx));
811            rx
812        }))
813        .await
814        .into_iter()
815        .flat_map(|s| s.unwrap().into_iter().map(ToWrite::Unpublish))
816        .collect::<Vec<_>>();
817        published_paths.shuffle(&mut rng());
818        // clear the vast majority of published paths using resources fairly
819        self.handle_batch_write(
820            None,
821            uifo.clone(),
822            publisher.clone(),
823            GPooled::orphan(published_paths),
824        )
825        .await?;
826        // clear out anything left over that was sent to all shards,
827        // e.g. default publishers.
828        self.handle_batch_write(
829            None,
830            uifo,
831            publisher,
832            GPooled::orphan(vec![ToWrite::Clear]),
833        )
834        .await?;
835        Ok(())
836    }
837}