1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
use crate::{v2::params::SubscriptionId, Error}; use core::marker::PhantomData; use futures_channel::{mpsc, oneshot}; use futures_util::{future::FutureExt, sink::SinkExt, stream::StreamExt}; use serde::de::DeserializeOwned; use serde_json::Value as JsonValue; /// Active subscription on a Client. pub struct Subscription<Notif> { /// Channel to send requests to the background task. pub to_back: mpsc::Sender<FrontToBack>, /// Channel from which we receive notifications from the server, as encoded `JsonValue`s. pub notifs_rx: mpsc::Receiver<JsonValue>, /// Subscription ID, pub id: SubscriptionId, /// Marker in order to pin the `Notif` parameter. pub marker: PhantomData<Notif>, } /// Batch request message. #[derive(Debug)] pub struct BatchMessage { /// Serialized batch request. pub raw: String, /// Request IDs. pub ids: Vec<u64>, /// One-shot channel over which we send back the result of this request. pub send_back: oneshot::Sender<Result<Vec<JsonValue>, Error>>, } /// Request message. #[derive(Debug)] pub struct RequestMessage { /// Serialized message. pub raw: String, /// Request ID. pub id: u64, /// One-shot channel over which we send back the result of this request. pub send_back: Option<oneshot::Sender<Result<JsonValue, Error>>>, } /// Subscription message. #[derive(Debug)] pub struct SubscriptionMessage { /// Serialized message. pub raw: String, /// Request ID of the subscribe message. pub subscribe_id: u64, /// Request ID of the unsubscribe message. pub unsubscribe_id: u64, /// Method to use to unsubscribe later. Used if the channel unexpectedly closes. pub unsubscribe_method: String, /// If the subscription succeeds, we return a [`mpsc::Receiver`] that will receive notifications. /// When we get a response from the server about that subscription, we send the result over /// this channel. pub send_back: oneshot::Sender<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>, } /// Message that the Client can send to the background task. #[derive(Debug)] pub enum FrontToBack { /// Send a batch request to the server. Batch(BatchMessage), /// Send a notification to the server. Notification(String), /// Send a request to the server. Request(RequestMessage), /// Send a subscription request to the server. Subscribe(SubscriptionMessage), /// When a subscription channel is closed, we send this message to the background /// task to mark it ready for garbage collection. // NOTE: It is not possible to cancel pending subscriptions or pending requests. // Such operations will be blocked until a response is received or the background // thread has been terminated. SubscriptionClosed(SubscriptionId), } impl<Notif> Subscription<Notif> where Notif: DeserializeOwned, { /// Returns the next notification from the stream /// This may return `None` if the subscription has been terminated, /// may happen if the channel becomes full or is dropped. /// /// Ignores any malformed packet. pub async fn next(&mut self) -> Option<Notif> { loop { match self.notifs_rx.next().await { Some(n) => match serde_json::from_value(n) { Ok(parsed) => return Some(parsed), Err(e) => log::debug!("Subscription response error: {:?}", e), }, None => return None, } } } } impl<Notif> Drop for Subscription<Notif> { fn drop(&mut self) { // We can't actually guarantee that this goes through. If the background task is busy, then // the channel's buffer will be full, and our unsubscription request will never make it. // However, when a notification arrives, the background task will realize that the channel // to the `Subscription` has been closed, and will perform the unsubscribe. let id = std::mem::replace(&mut self.id, SubscriptionId::Num(0)); let _ = self.to_back.send(FrontToBack::SubscriptionClosed(id)).now_or_never(); } }