crdb_client/
api_db.rs

1use crate::{
2    client_db::SavedObject,
3    connection::{
4        Command, Connection, ConnectionEvent, RequestWithSidecar, ResponsePartWithSidecar,
5        ResponseSender,
6    },
7};
8use anyhow::anyhow;
9use crdb_cache::CacheDb;
10use crdb_core::{
11    BinPtr, ClientSideDb, CrdbSyncFn, Db, Event, EventId, Importance, MaybeObject, Object,
12    ObjectData, ObjectId, Query, QueryId, Request, ResponsePart, ResultExt, SavedQuery, Session,
13    SessionRef, SessionToken, Updatedness, Updates, Upload, UploadId,
14};
15use futures::{channel::mpsc, future::Either, pin_mut, stream, FutureExt, StreamExt};
16use std::{
17    collections::{HashMap, HashSet, VecDeque},
18    future::Future,
19    iter,
20    sync::{Arc, Mutex, RwLock},
21};
22use tokio::sync::{oneshot, watch};
23
24#[non_exhaustive]
25pub enum OnError {
26    Rollback,
27    KeepLocal,
28    ReplaceWith(Upload),
29}
30
31pub struct ApiDb<LocalDb: ClientSideDb> {
32    connection: mpsc::UnboundedSender<Command>,
33    upload_queue_watcher_sender: Arc<Mutex<watch::Sender<Vec<UploadId>>>>,
34    upload_queue_watcher_receiver: watch::Receiver<Vec<UploadId>>,
35    db: Arc<CacheDb<LocalDb>>,
36    upload_resender: mpsc::UnboundedSender<(
37        Option<UploadId>,
38        Arc<Request>,
39        mpsc::UnboundedSender<ResponsePartWithSidecar>,
40    )>,
41    connection_event_cb: Arc<RwLock<Box<dyn CrdbSyncFn<ConnectionEvent>>>>,
42}
43
44impl<LocalDb: ClientSideDb> ApiDb<LocalDb> {
45    pub(crate) async fn new<C, GSO, GSQ, EH, EHF, RRL>(
46        db: Arc<CacheDb<LocalDb>>,
47        get_saved_objects: GSO,
48        get_saved_queries: GSQ,
49        error_handler: EH,
50        require_relogin: RRL,
51    ) -> crate::Result<(ApiDb<LocalDb>, mpsc::UnboundedReceiver<Updates>)>
52    where
53        C: crdb_core::Config,
54        GSO: 'static + waaaa::Send + FnMut() -> HashMap<ObjectId, SavedObject>,
55        GSQ: 'static + Send + FnMut() -> HashMap<QueryId, SavedQuery>,
56        EH: 'static + waaaa::Send + Fn(Upload, crate::Error) -> EHF,
57        EHF: 'static + waaaa::Future<Output = OnError>,
58        RRL: 'static + waaaa::Send + Fn(),
59    {
60        let (update_sender, update_receiver) = mpsc::unbounded();
61        let connection_event_cb: Arc<RwLock<Box<dyn CrdbSyncFn<ConnectionEvent>>>> =
62            Arc::new(RwLock::new(Box::new(|_| ()) as _));
63        let event_cb = {
64            let connection_event_cb = connection_event_cb.clone();
65            Box::new(move |evt| {
66                let need_relogin = match evt {
67                    ConnectionEvent::LoggingIn => false,
68                    ConnectionEvent::FailedConnecting(_) => true,
69                    ConnectionEvent::FailedSendingToken(_) => true,
70                    ConnectionEvent::LostConnection(_) => false,
71                    ConnectionEvent::InvalidToken(_) => true,
72                    ConnectionEvent::Connected => false,
73                    ConnectionEvent::TimeOffset(_) => false,
74                    ConnectionEvent::LoggedOut => true,
75                };
76                if need_relogin {
77                    (require_relogin)();
78                }
79                connection_event_cb.read().unwrap()(evt);
80            })
81        };
82        let (connection, commands) = mpsc::unbounded();
83        let (requests, requests_receiver) = mpsc::unbounded();
84        waaaa::spawn(
85            Connection::new(
86                commands,
87                requests_receiver,
88                event_cb,
89                update_sender,
90                get_saved_objects,
91                get_saved_queries,
92            )
93            .run(),
94        );
95        let all_uploads = db
96            .list_uploads()
97            .await
98            .wrap_context("listing upload queue")?;
99        let (upload_queue_watcher_sender, upload_queue_watcher_receiver) =
100            watch::channel(all_uploads.clone());
101        let upload_queue_watcher_sender = Arc::new(Mutex::new(upload_queue_watcher_sender));
102        let (upload_resender_sender, upload_resender_receiver) = mpsc::unbounded();
103        waaaa::spawn(upload_resender::<C, _, _, _>(
104            db.clone(),
105            upload_resender_receiver,
106            requests,
107            upload_queue_watcher_sender.clone(),
108            error_handler,
109        ));
110        for upload_id in all_uploads {
111            let upload = db
112                .get_upload(upload_id)
113                .await
114                .wrap_context("retrieving upload")?
115                .ok_or_else(|| {
116                    crate::Error::Other(anyhow!(
117                        "Upload vanished from queue while doing the initial read"
118                    ))
119                })?;
120            let request = Arc::new(Request::Upload(upload));
121            let (sender, _) = mpsc::unbounded(); // Ignore the response
122            upload_resender_sender
123                .unbounded_send((Some(upload_id), request, sender))
124                .expect("connection cannot go away before apidb does");
125        }
126        Ok((
127            ApiDb {
128                db,
129                upload_queue_watcher_sender,
130                upload_queue_watcher_receiver,
131                connection,
132                upload_resender: upload_resender_sender,
133                connection_event_cb,
134            },
135            update_receiver,
136        ))
137    }
138
139    pub fn watch_upload_queue(&self) -> watch::Receiver<Vec<UploadId>> {
140        self.upload_queue_watcher_receiver.clone()
141    }
142
143    pub fn on_connection_event(&self, cb: impl 'static + CrdbSyncFn<ConnectionEvent>) {
144        *self.connection_event_cb.write().unwrap() = Box::new(cb);
145    }
146
147    pub fn login(&self, url: Arc<String>, token: SessionToken) {
148        self.connection
149            .unbounded_send(Command::Login { url, token })
150            .expect("connection cannot go away before sender does")
151    }
152
153    // TODO(api-highest): make this return when it's done logging out, and use it in ClientDb::logout
154    pub fn logout(&self) {
155        self.connection
156            .unbounded_send(Command::Logout)
157            .expect("connection cannot go away before sender does")
158    }
159
160    fn request(&self, request: Arc<Request>) -> mpsc::UnboundedReceiver<ResponsePartWithSidecar> {
161        let (sender, response) = mpsc::unbounded();
162        self.upload_resender
163            .unbounded_send((None, request, sender))
164            .expect("connection cannot go away before sender does");
165        response
166    }
167
168    pub fn rename_session(&self, name: String) -> oneshot::Receiver<crate::Result<()>> {
169        let response_receiver = self.request(Arc::new(Request::RenameSession(name)));
170        expect_simple_response(response_receiver)
171    }
172
173    pub async fn current_session(&self) -> crate::Result<Session> {
174        let response = self
175            .request(Arc::new(Request::CurrentSession))
176            .next()
177            .await
178            .ok_or_else(|| crate::Error::Other(anyhow!("Connection thread went down too early")))?;
179        match response.response {
180            ResponsePart::Sessions(mut sessions) if sessions.len() == 1 => {
181                Ok(sessions.pop().unwrap())
182            }
183            ResponsePart::Error(err) => Err(err.into()),
184            _ => Err(crate::Error::Other(anyhow!(
185                "Unexpected server response to CurrentSession: {:?}",
186                response.response
187            ))),
188        }
189    }
190
191    pub async fn list_sessions(&self) -> crate::Result<Vec<Session>> {
192        let response = self
193            .request(Arc::new(Request::ListSessions))
194            .next()
195            .await
196            .ok_or_else(|| crate::Error::Other(anyhow!("Connection thread went down too early")))?;
197        match response.response {
198            ResponsePart::Sessions(sessions) => Ok(sessions),
199            ResponsePart::Error(err) => Err(err.into()),
200            _ => Err(crate::Error::Other(anyhow!(
201                "Unexpected server response to ListSessions: {:?}",
202                response.response
203            ))),
204        }
205    }
206
207    pub fn disconnect_session(
208        &self,
209        session_ref: SessionRef,
210    ) -> oneshot::Receiver<crate::Result<()>> {
211        let response_receiver = self.request(Arc::new(Request::DisconnectSession(session_ref)));
212        expect_simple_response(response_receiver)
213    }
214
215    pub fn unsubscribe(&self, object_ids: HashSet<ObjectId>) {
216        self.request(Arc::new(Request::Unsubscribe(object_ids)));
217        // Ignore the response from the server, we don't care enough to wait for it
218    }
219
220    pub fn unsubscribe_query(&self, query_id: QueryId) {
221        self.request(Arc::new(Request::UnsubscribeQuery(query_id)));
222        // Ignore the response from the server, we don't care enough to wait for it
223    }
224
225    async fn handle_upload_response(
226        mut receiver: mpsc::UnboundedReceiver<ResponsePartWithSidecar>,
227    ) -> crate::Result<()> {
228        match receiver.next().await {
229            None => Err(crate::Error::Other(anyhow!(
230                "Connection did not return any answer to query"
231            ))),
232            Some(ResponsePartWithSidecar {
233                sidecar: Some(_), ..
234            }) => Err(crate::Error::Other(anyhow!(
235                "Connection returned sidecar while we expected a simple result"
236            ))),
237            Some(ResponsePartWithSidecar { response, .. }) => match response {
238                ResponsePart::Success => Ok(()),
239                ResponsePart::Error(err) => Err(err.into()),
240                ResponsePart::Sessions(_)
241                | ResponsePart::CurrentTime(_)
242                | ResponsePart::Objects { .. }
243                | ResponsePart::Binaries(_) => Err(crate::Error::Other(anyhow!(
244                    "Connection returned unexpected answer while expecting a simple result"
245                ))),
246            },
247        }
248    }
249
250    pub async fn create<T: Object>(
251        &self,
252        object_id: ObjectId,
253        created_at: EventId,
254        object: Arc<T>,
255        subscribe: bool,
256    ) -> crate::Result<impl Future<Output = crate::Result<()>>> {
257        let required_binaries = object.required_binaries();
258        let upload = Upload::Object {
259            object_id,
260            type_id: *T::type_ulid(),
261            created_at,
262            snapshot_version: T::snapshot_version(),
263            object: Arc::new(
264                serde_json::to_value(object)
265                    .wrap_context("serializing object for sending to api")?,
266            ),
267            subscribe,
268        };
269        let request = Arc::new(Request::Upload(upload.clone()));
270        let (result_sender, result_receiver) = mpsc::unbounded();
271        let upload_id = self
272            .db
273            .enqueue_upload(upload, required_binaries)
274            .await
275            .wrap_context("enqueuing upload")?;
276        let upload_list = self
277            .db
278            .list_uploads()
279            .await
280            .wrap_context("listing uploads")?;
281        self.upload_queue_watcher_sender
282            .lock()
283            .unwrap()
284            .send_replace(upload_list);
285        self.upload_resender
286            .unbounded_send((Some(upload_id), request, result_sender))
287            .map_err(|_| crate::Error::Other(anyhow!("Upload resender went out too early")))?;
288        Ok(Self::handle_upload_response(result_receiver))
289    }
290
291    pub async fn submit<T: Object>(
292        &self,
293        object_id: ObjectId,
294        event_id: EventId,
295        event: Arc<T::Event>,
296        subscribe: bool,
297    ) -> crate::Result<impl Future<Output = crate::Result<()>>> {
298        let required_binaries = event.required_binaries();
299        let upload = Upload::Event {
300            object_id,
301            type_id: *T::type_ulid(),
302            event_id,
303            event: Arc::new(
304                serde_json::to_value(event).wrap_context("serializing event for sending to api")?,
305            ),
306            subscribe,
307        };
308        let request = Arc::new(Request::Upload(upload.clone()));
309        let (result_sender, result_receiver) = mpsc::unbounded();
310        let upload_id = self
311            .db
312            .enqueue_upload(upload, required_binaries)
313            .await
314            .wrap_context("enqueuing upload")?;
315        let upload_list = self
316            .db
317            .list_uploads()
318            .await
319            .wrap_context("listing uploads")?;
320        self.upload_queue_watcher_sender
321            .lock()
322            .unwrap()
323            .send_replace(upload_list);
324        self.upload_resender
325            .unbounded_send((Some(upload_id), request, result_sender))
326            .map_err(|_| crate::Error::Other(anyhow!("Upload resender went out too early")))?;
327        Ok(Self::handle_upload_response(result_receiver))
328    }
329
330    pub async fn get(&self, object_id: ObjectId, subscribe: bool) -> crate::Result<ObjectData> {
331        let mut object_ids = HashMap::new();
332        object_ids.insert(object_id, None); // We do not know about this object yet, so None
333        let request = Arc::new(Request::Get {
334            object_ids,
335            subscribe,
336        });
337        let mut response = self.request(request);
338        match response.next().await {
339            None => Err(crate::Error::Other(anyhow!(
340                "Connection-handling thread went out before ApiDb"
341            ))),
342            Some(response) => match response.response {
343                ResponsePart::Error(err) => Err(err.into()),
344                ResponsePart::Objects { mut data, .. } if data.len() == 1 => {
345                    match data.pop().unwrap() {
346                        MaybeObject::AlreadySubscribed(_) => Err(crate::Error::Other(anyhow!(
347                            "Server unexpectedly told us we already know unknown {object_id:?}"
348                        ))),
349                        MaybeObject::NotYetSubscribed(res) => Ok(res),
350                    }
351                }
352                _ => Err(crate::Error::Other(anyhow!(
353                    "Unexpected response to GetSubscribe request: {:?}",
354                    response.response
355                ))),
356            },
357        }
358    }
359
360    pub fn query<T: Object>(
361        &self,
362        query_id: QueryId,
363        only_updated_since: Option<Updatedness>,
364        subscribe: bool,
365        query: Arc<Query>,
366    ) -> impl waaaa::Stream<Item = crate::Result<(MaybeObject, Option<Updatedness>)>> {
367        let request = Arc::new(Request::Query {
368            query_id,
369            type_id: *T::type_ulid(),
370            query,
371            only_updated_since,
372            subscribe,
373        });
374        self.request(request).flat_map(move |response| {
375            match response.response {
376                // No sidecar in answer to Request::Query
377                ResponsePart::Error(err) => Either::Left(stream::iter(iter::once(Err(err.into())))),
378                ResponsePart::Objects {
379                    data,
380                    now_have_all_until,
381                } => {
382                    let data_len = data.len();
383                    Either::Right(stream::iter(data.into_iter().enumerate().map(
384                        move |(i, d)| {
385                            let now_have_all_until = if i + 1 == data_len {
386                                now_have_all_until
387                            } else {
388                                None
389                            };
390                            Ok((d, now_have_all_until))
391                        },
392                    )))
393                }
394                resp => Either::Left(stream::iter(iter::once(Err(crate::Error::Other(anyhow!(
395                    "Server gave unexpected answer to QuerySubscribe request: {resp:?}"
396                )))))),
397            }
398        })
399    }
400
401    pub async fn get_binary(&self, binary_id: BinPtr) -> crate::Result<Option<Arc<[u8]>>> {
402        let mut binary_ids = HashSet::new();
403        binary_ids.insert(binary_id);
404        let request = Arc::new(Request::GetBinaries(binary_ids));
405        let mut response = self.request(request);
406        match response.next().await {
407            None => Err(crate::Error::Other(anyhow!(
408                "Connection-handling thread went out before ApiDb"
409            ))),
410            Some(response) => match response.response {
411                ResponsePart::Error(err) => Err(err.into()),
412                ResponsePart::Binaries(1) => {
413                    let bin = response.sidecar.ok_or_else(|| {
414                        crate::Error::Other(anyhow!(
415                            "Connection thread claimed to send us one binary but actually did not"
416                        ))
417                    })?;
418                    Ok(Some(bin))
419                }
420                _ => Err(crate::Error::Other(anyhow!(
421                    "Unexpected response to get-binary request: {:?}",
422                    response.response
423                ))),
424            },
425        }
426    }
427}
428
429async fn upload_resender<C, LocalDb, EH, EHF>(
430    db: Arc<CacheDb<LocalDb>>,
431    requests: mpsc::UnboundedReceiver<(
432        Option<UploadId>,
433        Arc<Request>,
434        mpsc::UnboundedSender<ResponsePartWithSidecar>,
435    )>,
436    connection: mpsc::UnboundedSender<(ResponseSender, Arc<RequestWithSidecar>)>,
437    upload_queue_watcher_sender: Arc<Mutex<watch::Sender<Vec<UploadId>>>>,
438    error_handler: EH,
439) where
440    C: crdb_core::Config,
441    LocalDb: ClientSideDb,
442    EH: 'static + waaaa::Send + Fn(Upload, crate::Error) -> EHF,
443    EHF: 'static + waaaa::Future<Output = OnError>,
444{
445    // The below loop is split into two sub parts: all that is just sent once, and all that requires
446    // re-sending if there were missing binaries
447    // This makes sure that all uploads have resolved before a query is submitted, while still allowing
448    // uploads and queries to resolve in parallel.
449    let requests = requests.peekable();
450    pin_mut!(requests);
451    macro_rules! poll_next_if {
452        ($cond:expr) => {
453            requests
454                .as_mut()
455                .peek()
456                .now_or_never()
457                .and_then(|req| req)
458                .map($cond)
459                .unwrap_or(false)
460        };
461    }
462    while requests.as_mut().peek().await.is_some() {
463        // First, handle all requests that require no re-sending. Just send them once and forget about them.
464        while poll_next_if!(|(id, _, _)| id.is_none()) {
465            let (upload_id, request, sender) = requests.next().await.unwrap();
466            tracing::trace!(?request, "resender received non-upload request");
467            assert!(upload_id.is_none(), "non-upload should not have an id");
468            let _ = connection.unbounded_send((
469                sender,
470                Arc::new(RequestWithSidecar {
471                    request,
472                    sidecar: Vec::new(),
473                }),
474            ));
475        }
476
477        // Then, handle uploads. We start them all, and resend them with the missing binaries until we're successfully done.
478        let mut upload_reqs = VecDeque::new();
479        while poll_next_if!(|(id, _, _)| id.is_some()) {
480            let (upload_id, request, final_sender) = requests.next().await.unwrap();
481            let upload_id = upload_id.unwrap();
482            tracing::trace!(?upload_id, ?request, "resender received upload request");
483            let (sender, receiver) = mpsc::unbounded();
484            upload_reqs.push_back((
485                upload_id,
486                Arc::new(RequestWithSidecar {
487                    request,
488                    sidecar: Vec::new(),
489                }),
490                Some(final_sender),
491                sender,
492                receiver,
493            ));
494        }
495        let mut upload_missing_binaries = None;
496        while !upload_reqs.is_empty() {
497            // Before anything, attempt to upload the missing binaries if we're at least at the second loop turn
498            // Ignore the result of uploading the binaries, as it's just a prerequisite for the other uploads here
499            if let Some(upload_missing_binaries) = upload_missing_binaries.take() {
500                let (sender, _) = mpsc::unbounded();
501                let _ = connection.unbounded_send((sender, upload_missing_binaries));
502            }
503
504            // First, submit all requests
505            for (_, request, _, sender, _) in upload_reqs.iter() {
506                let _ = connection.unbounded_send((sender.clone(), request.clone()));
507            }
508
509            // Then, wait for them all to finish, listing the missing binaries
510            // The successful or non-retryable requests get removed from upload_reqs here, by setting their final_sender to None
511            let mut missing_binaries = HashSet::new();
512            for (upload_id, request, final_sender, _, receiver) in upload_reqs.iter_mut() {
513                match receiver.next().await {
514                    None => return, // Connection was dropped
515                    Some(ResponsePartWithSidecar {
516                        sidecar: Some(_), ..
517                    }) => {
518                        tracing::error!("got response to upload that had a sidecar");
519                        continue;
520                    }
521                    Some(ResponsePartWithSidecar { response, .. }) => match response {
522                        ResponsePart::Success => {
523                            if let Err(err) = db.upload_finished(*upload_id).await {
524                                tracing::error!(?err, "failed dequeuing upload");
525                            } else {
526                                match db.list_uploads().await.wrap_context("listing uploads") {
527                                    Err(err) => {
528                                        tracing::error!(?err, "failed listing upload queue");
529                                    }
530                                    Ok(upload_list) => {
531                                        upload_queue_watcher_sender
532                                            .lock()
533                                            .unwrap()
534                                            .send_replace(upload_list);
535                                    }
536                                }
537                                let _ = final_sender.take().unwrap().unbounded_send(
538                                    ResponsePartWithSidecar {
539                                        response,
540                                        sidecar: None,
541                                    },
542                                );
543                            }
544                        }
545                        ResponsePart::Error(crdb_core::SerializableError::MissingBinaries(
546                            bins,
547                        )) => {
548                            missing_binaries.extend(bins);
549                        }
550                        ResponsePart::Error(crdb_core::SerializableError::ObjectDoesNotExist(
551                            _,
552                        )) if !missing_binaries.is_empty() => {
553                            // Do nothing, and retry on the next round: this can happen if eg. object creation failed due to a missing binary
554                            // If there was no missing binary yet, it means that there was no previous upload that we could retry.
555                            // As such, in that situation, fall through to the next Error handling, and send the error back to the user.
556                        }
557                        ResponsePart::Error(ref err) => {
558                            let Request::Upload(upload) = &*request.request else {
559                                panic!("is_upload == true but does not match Upload");
560                            };
561                            match error_handler((*upload).clone(), (*err).clone().into()).await {
562                                OnError::Rollback => {
563                                    if let Err(err) = undo_upload::<C, _>(&db, upload).await {
564                                        tracing::error!(?err, ?upload, "failed undoing upload");
565                                    } else if let Err(err) = db.upload_finished(*upload_id).await {
566                                        tracing::error!(?err, "failed dequeuing upload");
567                                    } else {
568                                        match db
569                                            .list_uploads()
570                                            .await
571                                            .wrap_context("listing uploads")
572                                        {
573                                            Err(err) => {
574                                                tracing::error!(
575                                                    ?err,
576                                                    "failed listing upload queue"
577                                                );
578                                            }
579                                            Ok(upload_list) => {
580                                                upload_queue_watcher_sender
581                                                    .lock()
582                                                    .unwrap()
583                                                    .send_replace(upload_list);
584                                            }
585                                        }
586                                        let _ = final_sender.take().unwrap().unbounded_send(
587                                            ResponsePartWithSidecar {
588                                                response,
589                                                sidecar: None,
590                                            },
591                                        );
592                                    }
593                                }
594                                OnError::KeepLocal => {
595                                    // Do not remove the upload from the queue, so that it gets attempted again upon next
596                                    // bootup. But do take the final_sender, so that we do not end up infinite-looping here.
597                                    let _ = final_sender.take().unwrap().unbounded_send(
598                                        ResponsePartWithSidecar {
599                                            response,
600                                            sidecar: None,
601                                        },
602                                    );
603                                }
604                                OnError::ReplaceWith(new_upload) => {
605                                    if let Err(err) = undo_upload::<C, _>(&db, upload).await {
606                                        tracing::error!(?err, ?upload, "failed undoing upload");
607                                    } else if let Err(err) =
608                                        do_upload::<C, _>(&db, &new_upload).await
609                                    {
610                                        tracing::error!(
611                                            ?err,
612                                            ?new_upload,
613                                            "failed doing replacement upload"
614                                        );
615                                    } else if let Err(err) = db.upload_finished(*upload_id).await {
616                                        tracing::error!(?err, "failed dequeuing upload");
617                                    } else {
618                                        match db
619                                            .list_uploads()
620                                            .await
621                                            .wrap_context("listing uploads")
622                                        {
623                                            Err(err) => {
624                                                tracing::error!(
625                                                    ?err,
626                                                    "failed listing upload queue"
627                                                );
628                                            }
629                                            Ok(upload_list) => {
630                                                upload_queue_watcher_sender
631                                                    .lock()
632                                                    .unwrap()
633                                                    .send_replace(upload_list);
634                                            }
635                                        }
636                                        let _ = final_sender.take().unwrap().unbounded_send(
637                                            ResponsePartWithSidecar {
638                                                response,
639                                                sidecar: None,
640                                            },
641                                        );
642                                    }
643                                }
644                            }
645                        }
646                        _ => {
647                            tracing::error!(?response, "Unexpected response to upload submission");
648                            continue;
649                        }
650                    },
651                }
652            }
653            upload_reqs.retain(|(_, _, final_sender, _, _)| final_sender.is_some());
654
655            // Were there missing binaries? If yes, prepend them to the list of requests to retry, and upload them this way.
656            if !missing_binaries.is_empty() {
657                let db = db.clone();
658                let binaries = stream::iter(missing_binaries.into_iter())
659                    .map(move |b| {
660                        let db = db.clone();
661                        async move { db.get_binary(b).await }
662                    })
663                    .buffer_unordered(16) // TODO(perf-low): is 16 a good number?
664                    .filter_map(|res| async move { res.ok().and_then(|o| o) })
665                    .collect::<Vec<Arc<[u8]>>>()
666                    .await;
667                upload_missing_binaries = Some(Arc::new(RequestWithSidecar {
668                    request: Arc::new(Request::UploadBinaries(binaries.len())),
669                    sidecar: binaries,
670                }));
671            }
672        }
673    }
674}
675
676async fn undo_upload<C: crdb_core::Config, LocalDb: ClientSideDb>(
677    local_db: &CacheDb<LocalDb>,
678    upload: &Upload,
679) -> crate::Result<()> {
680    match upload {
681        Upload::Object { object_id, .. } => local_db.remove(*object_id).await,
682        Upload::Event {
683            object_id,
684            type_id,
685            event_id,
686            ..
687        } => match C::remove_event(local_db, *type_id, *object_id, *event_id).await {
688            Err(crate::Error::EventTooEarly { .. }) => {
689                // EventTooEarly means that the object has been recreated since the event was submitted
690                // In turn, this means that the server has pushed a re-creation update that was accepted
691                // As such, the event was already undone, by application of the update
692                Ok(())
693            }
694            res => res,
695        },
696    }
697}
698
699async fn do_upload<C: crdb_core::Config, LocalDb: ClientSideDb>(
700    local_db: &CacheDb<LocalDb>,
701    upload: &Upload,
702) -> crate::Result<()> {
703    match upload {
704        Upload::Object {
705            object_id,
706            type_id,
707            created_at,
708            snapshot_version,
709            object,
710            ..
711        } => {
712            C::create(
713                local_db,
714                *type_id,
715                *object_id,
716                *created_at,
717                *snapshot_version,
718                object,
719            )
720            .await
721        }
722        Upload::Event {
723            object_id,
724            type_id,
725            event_id,
726            event,
727            ..
728        } => C::submit(
729            local_db,
730            *type_id,
731            *object_id,
732            *event_id,
733            event,
734            None,
735            Importance::NONE,
736        )
737        .await
738        .map(|_| ()),
739    }
740}
741
742fn expect_simple_response(
743    mut response_receiver: mpsc::UnboundedReceiver<ResponsePartWithSidecar>,
744) -> oneshot::Receiver<crate::Result<()>> {
745    // TODO(perf-med): handle this like handle_upload_response: probably with a not-must-use wrapper and removing the crdb_core::spawn?
746    let (sender, receiver) = oneshot::channel();
747    waaaa::spawn(async move {
748        let Some(response) = response_receiver.next().await else {
749            let _ = sender.send(Err(crate::Error::Other(anyhow!(
750                "Connection thread went down too ealy"
751            ))));
752            return;
753        };
754        let _ = match response.response {
755            ResponsePart::Success => sender.send(Ok(())),
756            ResponsePart::Error(err) => sender.send(Err(err.into())),
757            _ => sender.send(Err(crate::Error::Other(anyhow!(
758                "Unexpected server response to DisconnectSession: {:?}",
759                response.response
760            )))),
761        };
762    });
763    receiver
764}