1use crate::{
2 client_db::SavedObject,
3 connection::{
4 Command, Connection, ConnectionEvent, RequestWithSidecar, ResponsePartWithSidecar,
5 ResponseSender,
6 },
7};
8use anyhow::anyhow;
9use crdb_cache::CacheDb;
10use crdb_core::{
11 BinPtr, ClientSideDb, CrdbSyncFn, Db, Event, EventId, Importance, MaybeObject, Object,
12 ObjectData, ObjectId, Query, QueryId, Request, ResponsePart, ResultExt, SavedQuery, Session,
13 SessionRef, SessionToken, Updatedness, Updates, Upload, UploadId,
14};
15use futures::{channel::mpsc, future::Either, pin_mut, stream, FutureExt, StreamExt};
16use std::{
17 collections::{HashMap, HashSet, VecDeque},
18 future::Future,
19 iter,
20 sync::{Arc, Mutex, RwLock},
21};
22use tokio::sync::{oneshot, watch};
23
24#[non_exhaustive]
25pub enum OnError {
26 Rollback,
27 KeepLocal,
28 ReplaceWith(Upload),
29}
30
31pub struct ApiDb<LocalDb: ClientSideDb> {
32 connection: mpsc::UnboundedSender<Command>,
33 upload_queue_watcher_sender: Arc<Mutex<watch::Sender<Vec<UploadId>>>>,
34 upload_queue_watcher_receiver: watch::Receiver<Vec<UploadId>>,
35 db: Arc<CacheDb<LocalDb>>,
36 upload_resender: mpsc::UnboundedSender<(
37 Option<UploadId>,
38 Arc<Request>,
39 mpsc::UnboundedSender<ResponsePartWithSidecar>,
40 )>,
41 connection_event_cb: Arc<RwLock<Box<dyn CrdbSyncFn<ConnectionEvent>>>>,
42}
43
44impl<LocalDb: ClientSideDb> ApiDb<LocalDb> {
45 pub(crate) async fn new<C, GSO, GSQ, EH, EHF, RRL>(
46 db: Arc<CacheDb<LocalDb>>,
47 get_saved_objects: GSO,
48 get_saved_queries: GSQ,
49 error_handler: EH,
50 require_relogin: RRL,
51 ) -> crate::Result<(ApiDb<LocalDb>, mpsc::UnboundedReceiver<Updates>)>
52 where
53 C: crdb_core::Config,
54 GSO: 'static + waaaa::Send + FnMut() -> HashMap<ObjectId, SavedObject>,
55 GSQ: 'static + Send + FnMut() -> HashMap<QueryId, SavedQuery>,
56 EH: 'static + waaaa::Send + Fn(Upload, crate::Error) -> EHF,
57 EHF: 'static + waaaa::Future<Output = OnError>,
58 RRL: 'static + waaaa::Send + Fn(),
59 {
60 let (update_sender, update_receiver) = mpsc::unbounded();
61 let connection_event_cb: Arc<RwLock<Box<dyn CrdbSyncFn<ConnectionEvent>>>> =
62 Arc::new(RwLock::new(Box::new(|_| ()) as _));
63 let event_cb = {
64 let connection_event_cb = connection_event_cb.clone();
65 Box::new(move |evt| {
66 let need_relogin = match evt {
67 ConnectionEvent::LoggingIn => false,
68 ConnectionEvent::FailedConnecting(_) => true,
69 ConnectionEvent::FailedSendingToken(_) => true,
70 ConnectionEvent::LostConnection(_) => false,
71 ConnectionEvent::InvalidToken(_) => true,
72 ConnectionEvent::Connected => false,
73 ConnectionEvent::TimeOffset(_) => false,
74 ConnectionEvent::LoggedOut => true,
75 };
76 if need_relogin {
77 (require_relogin)();
78 }
79 connection_event_cb.read().unwrap()(evt);
80 })
81 };
82 let (connection, commands) = mpsc::unbounded();
83 let (requests, requests_receiver) = mpsc::unbounded();
84 waaaa::spawn(
85 Connection::new(
86 commands,
87 requests_receiver,
88 event_cb,
89 update_sender,
90 get_saved_objects,
91 get_saved_queries,
92 )
93 .run(),
94 );
95 let all_uploads = db
96 .list_uploads()
97 .await
98 .wrap_context("listing upload queue")?;
99 let (upload_queue_watcher_sender, upload_queue_watcher_receiver) =
100 watch::channel(all_uploads.clone());
101 let upload_queue_watcher_sender = Arc::new(Mutex::new(upload_queue_watcher_sender));
102 let (upload_resender_sender, upload_resender_receiver) = mpsc::unbounded();
103 waaaa::spawn(upload_resender::<C, _, _, _>(
104 db.clone(),
105 upload_resender_receiver,
106 requests,
107 upload_queue_watcher_sender.clone(),
108 error_handler,
109 ));
110 for upload_id in all_uploads {
111 let upload = db
112 .get_upload(upload_id)
113 .await
114 .wrap_context("retrieving upload")?
115 .ok_or_else(|| {
116 crate::Error::Other(anyhow!(
117 "Upload vanished from queue while doing the initial read"
118 ))
119 })?;
120 let request = Arc::new(Request::Upload(upload));
121 let (sender, _) = mpsc::unbounded(); upload_resender_sender
123 .unbounded_send((Some(upload_id), request, sender))
124 .expect("connection cannot go away before apidb does");
125 }
126 Ok((
127 ApiDb {
128 db,
129 upload_queue_watcher_sender,
130 upload_queue_watcher_receiver,
131 connection,
132 upload_resender: upload_resender_sender,
133 connection_event_cb,
134 },
135 update_receiver,
136 ))
137 }
138
139 pub fn watch_upload_queue(&self) -> watch::Receiver<Vec<UploadId>> {
140 self.upload_queue_watcher_receiver.clone()
141 }
142
143 pub fn on_connection_event(&self, cb: impl 'static + CrdbSyncFn<ConnectionEvent>) {
144 *self.connection_event_cb.write().unwrap() = Box::new(cb);
145 }
146
147 pub fn login(&self, url: Arc<String>, token: SessionToken) {
148 self.connection
149 .unbounded_send(Command::Login { url, token })
150 .expect("connection cannot go away before sender does")
151 }
152
153 pub fn logout(&self) {
155 self.connection
156 .unbounded_send(Command::Logout)
157 .expect("connection cannot go away before sender does")
158 }
159
160 fn request(&self, request: Arc<Request>) -> mpsc::UnboundedReceiver<ResponsePartWithSidecar> {
161 let (sender, response) = mpsc::unbounded();
162 self.upload_resender
163 .unbounded_send((None, request, sender))
164 .expect("connection cannot go away before sender does");
165 response
166 }
167
168 pub fn rename_session(&self, name: String) -> oneshot::Receiver<crate::Result<()>> {
169 let response_receiver = self.request(Arc::new(Request::RenameSession(name)));
170 expect_simple_response(response_receiver)
171 }
172
173 pub async fn current_session(&self) -> crate::Result<Session> {
174 let response = self
175 .request(Arc::new(Request::CurrentSession))
176 .next()
177 .await
178 .ok_or_else(|| crate::Error::Other(anyhow!("Connection thread went down too early")))?;
179 match response.response {
180 ResponsePart::Sessions(mut sessions) if sessions.len() == 1 => {
181 Ok(sessions.pop().unwrap())
182 }
183 ResponsePart::Error(err) => Err(err.into()),
184 _ => Err(crate::Error::Other(anyhow!(
185 "Unexpected server response to CurrentSession: {:?}",
186 response.response
187 ))),
188 }
189 }
190
191 pub async fn list_sessions(&self) -> crate::Result<Vec<Session>> {
192 let response = self
193 .request(Arc::new(Request::ListSessions))
194 .next()
195 .await
196 .ok_or_else(|| crate::Error::Other(anyhow!("Connection thread went down too early")))?;
197 match response.response {
198 ResponsePart::Sessions(sessions) => Ok(sessions),
199 ResponsePart::Error(err) => Err(err.into()),
200 _ => Err(crate::Error::Other(anyhow!(
201 "Unexpected server response to ListSessions: {:?}",
202 response.response
203 ))),
204 }
205 }
206
207 pub fn disconnect_session(
208 &self,
209 session_ref: SessionRef,
210 ) -> oneshot::Receiver<crate::Result<()>> {
211 let response_receiver = self.request(Arc::new(Request::DisconnectSession(session_ref)));
212 expect_simple_response(response_receiver)
213 }
214
215 pub fn unsubscribe(&self, object_ids: HashSet<ObjectId>) {
216 self.request(Arc::new(Request::Unsubscribe(object_ids)));
217 }
219
220 pub fn unsubscribe_query(&self, query_id: QueryId) {
221 self.request(Arc::new(Request::UnsubscribeQuery(query_id)));
222 }
224
225 async fn handle_upload_response(
226 mut receiver: mpsc::UnboundedReceiver<ResponsePartWithSidecar>,
227 ) -> crate::Result<()> {
228 match receiver.next().await {
229 None => Err(crate::Error::Other(anyhow!(
230 "Connection did not return any answer to query"
231 ))),
232 Some(ResponsePartWithSidecar {
233 sidecar: Some(_), ..
234 }) => Err(crate::Error::Other(anyhow!(
235 "Connection returned sidecar while we expected a simple result"
236 ))),
237 Some(ResponsePartWithSidecar { response, .. }) => match response {
238 ResponsePart::Success => Ok(()),
239 ResponsePart::Error(err) => Err(err.into()),
240 ResponsePart::Sessions(_)
241 | ResponsePart::CurrentTime(_)
242 | ResponsePart::Objects { .. }
243 | ResponsePart::Binaries(_) => Err(crate::Error::Other(anyhow!(
244 "Connection returned unexpected answer while expecting a simple result"
245 ))),
246 },
247 }
248 }
249
250 pub async fn create<T: Object>(
251 &self,
252 object_id: ObjectId,
253 created_at: EventId,
254 object: Arc<T>,
255 subscribe: bool,
256 ) -> crate::Result<impl Future<Output = crate::Result<()>>> {
257 let required_binaries = object.required_binaries();
258 let upload = Upload::Object {
259 object_id,
260 type_id: *T::type_ulid(),
261 created_at,
262 snapshot_version: T::snapshot_version(),
263 object: Arc::new(
264 serde_json::to_value(object)
265 .wrap_context("serializing object for sending to api")?,
266 ),
267 subscribe,
268 };
269 let request = Arc::new(Request::Upload(upload.clone()));
270 let (result_sender, result_receiver) = mpsc::unbounded();
271 let upload_id = self
272 .db
273 .enqueue_upload(upload, required_binaries)
274 .await
275 .wrap_context("enqueuing upload")?;
276 let upload_list = self
277 .db
278 .list_uploads()
279 .await
280 .wrap_context("listing uploads")?;
281 self.upload_queue_watcher_sender
282 .lock()
283 .unwrap()
284 .send_replace(upload_list);
285 self.upload_resender
286 .unbounded_send((Some(upload_id), request, result_sender))
287 .map_err(|_| crate::Error::Other(anyhow!("Upload resender went out too early")))?;
288 Ok(Self::handle_upload_response(result_receiver))
289 }
290
291 pub async fn submit<T: Object>(
292 &self,
293 object_id: ObjectId,
294 event_id: EventId,
295 event: Arc<T::Event>,
296 subscribe: bool,
297 ) -> crate::Result<impl Future<Output = crate::Result<()>>> {
298 let required_binaries = event.required_binaries();
299 let upload = Upload::Event {
300 object_id,
301 type_id: *T::type_ulid(),
302 event_id,
303 event: Arc::new(
304 serde_json::to_value(event).wrap_context("serializing event for sending to api")?,
305 ),
306 subscribe,
307 };
308 let request = Arc::new(Request::Upload(upload.clone()));
309 let (result_sender, result_receiver) = mpsc::unbounded();
310 let upload_id = self
311 .db
312 .enqueue_upload(upload, required_binaries)
313 .await
314 .wrap_context("enqueuing upload")?;
315 let upload_list = self
316 .db
317 .list_uploads()
318 .await
319 .wrap_context("listing uploads")?;
320 self.upload_queue_watcher_sender
321 .lock()
322 .unwrap()
323 .send_replace(upload_list);
324 self.upload_resender
325 .unbounded_send((Some(upload_id), request, result_sender))
326 .map_err(|_| crate::Error::Other(anyhow!("Upload resender went out too early")))?;
327 Ok(Self::handle_upload_response(result_receiver))
328 }
329
330 pub async fn get(&self, object_id: ObjectId, subscribe: bool) -> crate::Result<ObjectData> {
331 let mut object_ids = HashMap::new();
332 object_ids.insert(object_id, None); let request = Arc::new(Request::Get {
334 object_ids,
335 subscribe,
336 });
337 let mut response = self.request(request);
338 match response.next().await {
339 None => Err(crate::Error::Other(anyhow!(
340 "Connection-handling thread went out before ApiDb"
341 ))),
342 Some(response) => match response.response {
343 ResponsePart::Error(err) => Err(err.into()),
344 ResponsePart::Objects { mut data, .. } if data.len() == 1 => {
345 match data.pop().unwrap() {
346 MaybeObject::AlreadySubscribed(_) => Err(crate::Error::Other(anyhow!(
347 "Server unexpectedly told us we already know unknown {object_id:?}"
348 ))),
349 MaybeObject::NotYetSubscribed(res) => Ok(res),
350 }
351 }
352 _ => Err(crate::Error::Other(anyhow!(
353 "Unexpected response to GetSubscribe request: {:?}",
354 response.response
355 ))),
356 },
357 }
358 }
359
360 pub fn query<T: Object>(
361 &self,
362 query_id: QueryId,
363 only_updated_since: Option<Updatedness>,
364 subscribe: bool,
365 query: Arc<Query>,
366 ) -> impl waaaa::Stream<Item = crate::Result<(MaybeObject, Option<Updatedness>)>> {
367 let request = Arc::new(Request::Query {
368 query_id,
369 type_id: *T::type_ulid(),
370 query,
371 only_updated_since,
372 subscribe,
373 });
374 self.request(request).flat_map(move |response| {
375 match response.response {
376 ResponsePart::Error(err) => Either::Left(stream::iter(iter::once(Err(err.into())))),
378 ResponsePart::Objects {
379 data,
380 now_have_all_until,
381 } => {
382 let data_len = data.len();
383 Either::Right(stream::iter(data.into_iter().enumerate().map(
384 move |(i, d)| {
385 let now_have_all_until = if i + 1 == data_len {
386 now_have_all_until
387 } else {
388 None
389 };
390 Ok((d, now_have_all_until))
391 },
392 )))
393 }
394 resp => Either::Left(stream::iter(iter::once(Err(crate::Error::Other(anyhow!(
395 "Server gave unexpected answer to QuerySubscribe request: {resp:?}"
396 )))))),
397 }
398 })
399 }
400
401 pub async fn get_binary(&self, binary_id: BinPtr) -> crate::Result<Option<Arc<[u8]>>> {
402 let mut binary_ids = HashSet::new();
403 binary_ids.insert(binary_id);
404 let request = Arc::new(Request::GetBinaries(binary_ids));
405 let mut response = self.request(request);
406 match response.next().await {
407 None => Err(crate::Error::Other(anyhow!(
408 "Connection-handling thread went out before ApiDb"
409 ))),
410 Some(response) => match response.response {
411 ResponsePart::Error(err) => Err(err.into()),
412 ResponsePart::Binaries(1) => {
413 let bin = response.sidecar.ok_or_else(|| {
414 crate::Error::Other(anyhow!(
415 "Connection thread claimed to send us one binary but actually did not"
416 ))
417 })?;
418 Ok(Some(bin))
419 }
420 _ => Err(crate::Error::Other(anyhow!(
421 "Unexpected response to get-binary request: {:?}",
422 response.response
423 ))),
424 },
425 }
426 }
427}
428
429async fn upload_resender<C, LocalDb, EH, EHF>(
430 db: Arc<CacheDb<LocalDb>>,
431 requests: mpsc::UnboundedReceiver<(
432 Option<UploadId>,
433 Arc<Request>,
434 mpsc::UnboundedSender<ResponsePartWithSidecar>,
435 )>,
436 connection: mpsc::UnboundedSender<(ResponseSender, Arc<RequestWithSidecar>)>,
437 upload_queue_watcher_sender: Arc<Mutex<watch::Sender<Vec<UploadId>>>>,
438 error_handler: EH,
439) where
440 C: crdb_core::Config,
441 LocalDb: ClientSideDb,
442 EH: 'static + waaaa::Send + Fn(Upload, crate::Error) -> EHF,
443 EHF: 'static + waaaa::Future<Output = OnError>,
444{
445 let requests = requests.peekable();
450 pin_mut!(requests);
451 macro_rules! poll_next_if {
452 ($cond:expr) => {
453 requests
454 .as_mut()
455 .peek()
456 .now_or_never()
457 .and_then(|req| req)
458 .map($cond)
459 .unwrap_or(false)
460 };
461 }
462 while requests.as_mut().peek().await.is_some() {
463 while poll_next_if!(|(id, _, _)| id.is_none()) {
465 let (upload_id, request, sender) = requests.next().await.unwrap();
466 tracing::trace!(?request, "resender received non-upload request");
467 assert!(upload_id.is_none(), "non-upload should not have an id");
468 let _ = connection.unbounded_send((
469 sender,
470 Arc::new(RequestWithSidecar {
471 request,
472 sidecar: Vec::new(),
473 }),
474 ));
475 }
476
477 let mut upload_reqs = VecDeque::new();
479 while poll_next_if!(|(id, _, _)| id.is_some()) {
480 let (upload_id, request, final_sender) = requests.next().await.unwrap();
481 let upload_id = upload_id.unwrap();
482 tracing::trace!(?upload_id, ?request, "resender received upload request");
483 let (sender, receiver) = mpsc::unbounded();
484 upload_reqs.push_back((
485 upload_id,
486 Arc::new(RequestWithSidecar {
487 request,
488 sidecar: Vec::new(),
489 }),
490 Some(final_sender),
491 sender,
492 receiver,
493 ));
494 }
495 let mut upload_missing_binaries = None;
496 while !upload_reqs.is_empty() {
497 if let Some(upload_missing_binaries) = upload_missing_binaries.take() {
500 let (sender, _) = mpsc::unbounded();
501 let _ = connection.unbounded_send((sender, upload_missing_binaries));
502 }
503
504 for (_, request, _, sender, _) in upload_reqs.iter() {
506 let _ = connection.unbounded_send((sender.clone(), request.clone()));
507 }
508
509 let mut missing_binaries = HashSet::new();
512 for (upload_id, request, final_sender, _, receiver) in upload_reqs.iter_mut() {
513 match receiver.next().await {
514 None => return, Some(ResponsePartWithSidecar {
516 sidecar: Some(_), ..
517 }) => {
518 tracing::error!("got response to upload that had a sidecar");
519 continue;
520 }
521 Some(ResponsePartWithSidecar { response, .. }) => match response {
522 ResponsePart::Success => {
523 if let Err(err) = db.upload_finished(*upload_id).await {
524 tracing::error!(?err, "failed dequeuing upload");
525 } else {
526 match db.list_uploads().await.wrap_context("listing uploads") {
527 Err(err) => {
528 tracing::error!(?err, "failed listing upload queue");
529 }
530 Ok(upload_list) => {
531 upload_queue_watcher_sender
532 .lock()
533 .unwrap()
534 .send_replace(upload_list);
535 }
536 }
537 let _ = final_sender.take().unwrap().unbounded_send(
538 ResponsePartWithSidecar {
539 response,
540 sidecar: None,
541 },
542 );
543 }
544 }
545 ResponsePart::Error(crdb_core::SerializableError::MissingBinaries(
546 bins,
547 )) => {
548 missing_binaries.extend(bins);
549 }
550 ResponsePart::Error(crdb_core::SerializableError::ObjectDoesNotExist(
551 _,
552 )) if !missing_binaries.is_empty() => {
553 }
557 ResponsePart::Error(ref err) => {
558 let Request::Upload(upload) = &*request.request else {
559 panic!("is_upload == true but does not match Upload");
560 };
561 match error_handler((*upload).clone(), (*err).clone().into()).await {
562 OnError::Rollback => {
563 if let Err(err) = undo_upload::<C, _>(&db, upload).await {
564 tracing::error!(?err, ?upload, "failed undoing upload");
565 } else if let Err(err) = db.upload_finished(*upload_id).await {
566 tracing::error!(?err, "failed dequeuing upload");
567 } else {
568 match db
569 .list_uploads()
570 .await
571 .wrap_context("listing uploads")
572 {
573 Err(err) => {
574 tracing::error!(
575 ?err,
576 "failed listing upload queue"
577 );
578 }
579 Ok(upload_list) => {
580 upload_queue_watcher_sender
581 .lock()
582 .unwrap()
583 .send_replace(upload_list);
584 }
585 }
586 let _ = final_sender.take().unwrap().unbounded_send(
587 ResponsePartWithSidecar {
588 response,
589 sidecar: None,
590 },
591 );
592 }
593 }
594 OnError::KeepLocal => {
595 let _ = final_sender.take().unwrap().unbounded_send(
598 ResponsePartWithSidecar {
599 response,
600 sidecar: None,
601 },
602 );
603 }
604 OnError::ReplaceWith(new_upload) => {
605 if let Err(err) = undo_upload::<C, _>(&db, upload).await {
606 tracing::error!(?err, ?upload, "failed undoing upload");
607 } else if let Err(err) =
608 do_upload::<C, _>(&db, &new_upload).await
609 {
610 tracing::error!(
611 ?err,
612 ?new_upload,
613 "failed doing replacement upload"
614 );
615 } else if let Err(err) = db.upload_finished(*upload_id).await {
616 tracing::error!(?err, "failed dequeuing upload");
617 } else {
618 match db
619 .list_uploads()
620 .await
621 .wrap_context("listing uploads")
622 {
623 Err(err) => {
624 tracing::error!(
625 ?err,
626 "failed listing upload queue"
627 );
628 }
629 Ok(upload_list) => {
630 upload_queue_watcher_sender
631 .lock()
632 .unwrap()
633 .send_replace(upload_list);
634 }
635 }
636 let _ = final_sender.take().unwrap().unbounded_send(
637 ResponsePartWithSidecar {
638 response,
639 sidecar: None,
640 },
641 );
642 }
643 }
644 }
645 }
646 _ => {
647 tracing::error!(?response, "Unexpected response to upload submission");
648 continue;
649 }
650 },
651 }
652 }
653 upload_reqs.retain(|(_, _, final_sender, _, _)| final_sender.is_some());
654
655 if !missing_binaries.is_empty() {
657 let db = db.clone();
658 let binaries = stream::iter(missing_binaries.into_iter())
659 .map(move |b| {
660 let db = db.clone();
661 async move { db.get_binary(b).await }
662 })
663 .buffer_unordered(16) .filter_map(|res| async move { res.ok().and_then(|o| o) })
665 .collect::<Vec<Arc<[u8]>>>()
666 .await;
667 upload_missing_binaries = Some(Arc::new(RequestWithSidecar {
668 request: Arc::new(Request::UploadBinaries(binaries.len())),
669 sidecar: binaries,
670 }));
671 }
672 }
673 }
674}
675
676async fn undo_upload<C: crdb_core::Config, LocalDb: ClientSideDb>(
677 local_db: &CacheDb<LocalDb>,
678 upload: &Upload,
679) -> crate::Result<()> {
680 match upload {
681 Upload::Object { object_id, .. } => local_db.remove(*object_id).await,
682 Upload::Event {
683 object_id,
684 type_id,
685 event_id,
686 ..
687 } => match C::remove_event(local_db, *type_id, *object_id, *event_id).await {
688 Err(crate::Error::EventTooEarly { .. }) => {
689 Ok(())
693 }
694 res => res,
695 },
696 }
697}
698
699async fn do_upload<C: crdb_core::Config, LocalDb: ClientSideDb>(
700 local_db: &CacheDb<LocalDb>,
701 upload: &Upload,
702) -> crate::Result<()> {
703 match upload {
704 Upload::Object {
705 object_id,
706 type_id,
707 created_at,
708 snapshot_version,
709 object,
710 ..
711 } => {
712 C::create(
713 local_db,
714 *type_id,
715 *object_id,
716 *created_at,
717 *snapshot_version,
718 object,
719 )
720 .await
721 }
722 Upload::Event {
723 object_id,
724 type_id,
725 event_id,
726 event,
727 ..
728 } => C::submit(
729 local_db,
730 *type_id,
731 *object_id,
732 *event_id,
733 event,
734 None,
735 Importance::NONE,
736 )
737 .await
738 .map(|_| ()),
739 }
740}
741
742fn expect_simple_response(
743 mut response_receiver: mpsc::UnboundedReceiver<ResponsePartWithSidecar>,
744) -> oneshot::Receiver<crate::Result<()>> {
745 let (sender, receiver) = oneshot::channel();
747 waaaa::spawn(async move {
748 let Some(response) = response_receiver.next().await else {
749 let _ = sender.send(Err(crate::Error::Other(anyhow!(
750 "Connection thread went down too ealy"
751 ))));
752 return;
753 };
754 let _ = match response.response {
755 ResponsePart::Success => sender.send(Ok(())),
756 ResponsePart::Error(err) => sender.send(Err(err.into())),
757 _ => sender.send(Err(crate::Error::Other(anyhow!(
758 "Unexpected server response to DisconnectSession: {:?}",
759 response.response
760 )))),
761 };
762 });
763 receiver
764}