convex/client/
mod.rs

1use std::{
2    collections::BTreeMap,
3    convert::Infallible,
4    sync::Arc,
5};
6
7use convex_sync_types::{
8    AuthenticationToken,
9    UdfPath,
10    UserIdentityAttributes,
11};
12#[cfg(doc)]
13use futures::Stream;
14use futures::StreamExt;
15use tokio::{
16    sync::{
17        broadcast,
18        mpsc,
19        oneshot,
20    },
21    task::JoinHandle,
22};
23use tokio_stream::wrappers::BroadcastStream;
24use url::Url;
25
26use self::worker::AuthenticateRequest;
27#[cfg(doc)]
28use crate::SubscriberId;
29use crate::{
30    base_client::{
31        BaseConvexClient,
32        QueryResults,
33    },
34    client::{
35        subscription::{
36            QuerySetSubscription,
37            QuerySubscription,
38        },
39        worker::{
40            worker,
41            ActionRequest,
42            ClientRequest,
43            MutationRequest,
44            SubscribeRequest,
45        },
46    },
47    sync::{
48        web_socket_manager::WebSocketManager,
49        SyncProtocol,
50        WebSocketState,
51    },
52    value::Value,
53    FunctionResult,
54};
55
56pub mod subscription;
57mod worker;
58
59const VERSION: Option<&str> = option_env!("CARGO_PKG_VERSION");
60
61/// An asynchronous client to interact with a specific project to perform
62/// mutations and manage query subscriptions using [`tokio`].
63///
64/// The Convex client requires a deployment url,
65/// which can be found in the [dashboard](https://dashboard.convex.dev/) settings tab.
66///
67/// ```no_run
68/// use convex::ConvexClient;
69/// use futures::StreamExt;
70///
71/// #[tokio::main]
72/// async fn main() -> anyhow::Result<()> {
73///     let mut client = ConvexClient::new("https://cool-music-123.convex.cloud").await?;
74///     let mut sub = client.subscribe("listMessages", maplit::btreemap!{}).await?;
75///     while let Some(result) = sub.next().await {
76///         println!("{result:?}");
77///     }
78///     Ok(())
79/// }
80/// ```
81///
82/// The [`ConvexClient`] internally holds a connection and a [`tokio`]
83/// background task to manage it. It is advised that you create one and
84/// **reuse** it. You can safely clone with [`ConvexClient::clone()`] to share
85/// the connection and outstanding subscriptions.
86///
87/// ## Examples
88/// For example code, please refer to the examples directory.
89pub struct ConvexClient {
90    listen_handle: Option<Arc<JoinHandle<Infallible>>>,
91    request_sender: mpsc::UnboundedSender<ClientRequest>,
92    watch_receiver: broadcast::Receiver<QueryResults>,
93}
94
95/// Clone the [`ConvexClient`], sharing the connection and outstanding
96/// subscriptions.
97impl Clone for ConvexClient {
98    fn clone(&self) -> Self {
99        Self {
100            listen_handle: self.listen_handle.clone(),
101            request_sender: self.request_sender.clone(),
102            watch_receiver: self.watch_receiver.resubscribe(),
103        }
104    }
105}
106
107/// Drop the [`ConvexClient`]. When the final reference to the [`ConvexClient`]
108/// is dropped, the connection is cleaned up.
109impl Drop for ConvexClient {
110    fn drop(&mut self) {
111        if let Ok(j_handle) = Arc::try_unwrap(
112            self.listen_handle
113                .take()
114                .expect("INTERNAL BUG: listen handle should never be none"),
115        ) {
116            j_handle.abort()
117        }
118    }
119}
120
121impl ConvexClient {
122    /// Constructs a new client for communicating with `deployment_url`.
123    ///
124    /// ```no_run
125    /// # use convex::ConvexClient;
126    /// # #[tokio::main]
127    /// # async fn main() -> anyhow::Result<()> {
128    /// let client = ConvexClient::new("https://cool-music-123.convex.cloud").await?;
129    /// # Ok(())
130    /// # }
131    /// ```
132    pub async fn new(deployment_url: &str) -> anyhow::Result<Self> {
133        ConvexClient::new_from_builder(ConvexClientBuilder::new(deployment_url)).await
134    }
135
136    #[doc(hidden)]
137    pub async fn new_from_builder(builder: ConvexClientBuilder) -> anyhow::Result<Self> {
138        let client_id = builder
139            .client_id
140            .unwrap_or_else(|| format!("rust-{}", VERSION.unwrap_or("unknown")));
141        let ws_url = deployment_to_ws_url(builder.deployment_url.as_str().try_into()?)?;
142
143        // Channels for the `listen` background thread
144        let (response_sender, response_receiver) = mpsc::channel(1);
145        let (request_sender, request_receiver) = mpsc::unbounded_channel();
146
147        // Listener for when each transaction completes
148        let (watch_sender, watch_receiver) = broadcast::channel(1);
149
150        let base_client = BaseConvexClient::new();
151
152        let protocol = WebSocketManager::open(
153            ws_url,
154            response_sender,
155            builder.on_state_change,
156            client_id.as_str(),
157        )
158        .await?;
159
160        let listen_handle = tokio::spawn(worker(
161            response_receiver,
162            request_receiver,
163            watch_sender,
164            base_client,
165            protocol,
166        ));
167        let client = ConvexClient {
168            listen_handle: Some(Arc::new(listen_handle)),
169            request_sender,
170            watch_receiver,
171        };
172        Ok(client)
173    }
174
175    /// Subscribe to the results of query `name` called with `args`.
176    ///
177    /// Returns a [`QuerySubscription`] which implements [`Stream`]<
178    /// [`FunctionResult`]>. A new value appears on the stream each
179    /// time the query function produces a new result.
180    ///
181    /// The subscription is automatically unsubscribed when it is dropped.
182    ///
183    /// ```no_run
184    /// # use convex::ConvexClient;
185    /// # use futures::StreamExt;
186    /// # #[tokio::main]
187    /// # async fn main() -> anyhow::Result<()> {
188    /// let mut client = ConvexClient::new("https://cool-music-123.convex.cloud").await?;
189    /// let mut sub = client.subscribe("listMessages", maplit::btreemap!{}).await?;
190    /// while let Some(result) = sub.next().await {
191    ///     println!("{result:?}");
192    /// }
193    /// # Ok(())
194    /// # }
195    pub async fn subscribe(
196        &mut self,
197        name: &str,
198        args: BTreeMap<String, Value>,
199    ) -> anyhow::Result<QuerySubscription> {
200        let (tx, rx) = oneshot::channel();
201
202        let udf_path = name.parse()?;
203        let request = SubscribeRequest { udf_path, args };
204
205        self.request_sender.send(ClientRequest::Subscribe(
206            request,
207            tx,
208            self.request_sender.clone(),
209        ))?;
210
211        let res = rx.await?;
212        Ok(res)
213    }
214
215    /// Make a oneshot request to a query `name` with `args`.
216    ///
217    /// Returns a [`FunctionResult`] representing the result of the query.
218    ///
219    /// This method is syntactic sugar for waiting for a single result on
220    /// a subscription.
221    /// It is equivalent to `client.subscribe(name,
222    /// args).await?.next().unwrap()`
223    ///
224    /// ```no_run
225    /// # use convex::ConvexClient;
226    /// # use futures::StreamExt;
227    /// # #[tokio::main]
228    /// # async fn main() -> anyhow::Result<()> {
229    /// let mut client = ConvexClient::new("https://cool-music-123.convex.cloud").await?;
230    /// let result = client.query("listMessages", maplit::btreemap!{}).await?;
231    /// println!("{result:?}");
232    /// # Ok(())
233    /// # }
234    pub async fn query(
235        &mut self,
236        name: &str,
237        args: BTreeMap<String, Value>,
238    ) -> anyhow::Result<FunctionResult> {
239        Ok(self
240            .subscribe(name, args)
241            .await?
242            .next()
243            .await
244            .expect("INTERNAL BUG: Convex Client dropped prematurely."))
245    }
246
247    /// Perform a mutation `name` with `args` and return a future
248    /// containing the return value of the mutation once it completes.
249    ///
250    /// ```no_run
251    /// # use convex::ConvexClient;
252    /// # use futures::StreamExt;
253    /// # #[tokio::main]
254    /// # async fn main() -> anyhow::Result<()> {
255    /// let mut client = ConvexClient::new("https://cool-music-123.convex.cloud").await?;
256    /// let result = client.mutation("sendMessage", maplit::btreemap!{
257    ///     "body".into() => "Let it be.".into(),
258    ///     "author".into() => "The Beatles".into(),
259    /// }).await?;
260    /// println!("{result:?}");
261    /// # Ok(())
262    /// # }
263    pub async fn mutation(
264        &mut self,
265        name: &str,
266        args: BTreeMap<String, Value>,
267    ) -> anyhow::Result<FunctionResult> {
268        let (tx, rx) = oneshot::channel();
269
270        let udf_path: UdfPath = name.parse()?;
271        let request = MutationRequest { udf_path, args };
272
273        self.request_sender
274            .send(ClientRequest::Mutation(request, tx))?;
275
276        let res = rx.await?;
277        Ok(res.await?)
278    }
279
280    /// Perform an action `name` with `args` and return a future
281    /// containing the return value of the action once it completes.
282    ///
283    /// ```no_run
284    /// # use convex::ConvexClient;
285    /// # use futures::StreamExt;
286    /// # #[tokio::main]
287    /// # async fn main() -> anyhow::Result<()> {
288    /// let mut client = ConvexClient::new("https://cool-music-123.convex.cloud").await?;
289    /// let result = client.action("sendGif", maplit::btreemap!{
290    ///     "body".into() => "Tatooine Sunrise.".into(),
291    ///     "author".into() => "Luke Skywalker".into(),
292    /// }).await?;
293    /// println!("{result:?}");
294    /// # Ok(())
295    /// # }
296    pub async fn action(
297        &mut self,
298        name: &str,
299        args: BTreeMap<String, Value>,
300    ) -> anyhow::Result<FunctionResult> {
301        let (tx, rx) = oneshot::channel();
302
303        let udf_path: UdfPath = name.parse()?;
304        let request = ActionRequest { udf_path, args };
305
306        self.request_sender
307            .send(ClientRequest::Action(request, tx))?;
308
309        let res = rx.await?;
310        Ok(res.await?)
311    }
312
313    /// Get a consistent view of the results of multiple queries (query set).
314    ///
315    /// Returns a [`QuerySetSubscription`] which
316    /// implements [`Stream`]<[`QueryResults`]>.
317    /// Each item in the stream contains a consistent view
318    /// of the results of all the queries in the query set.
319    ///
320    /// Queries can be added to the query set via [`ConvexClient::subscribe`].
321    /// Queries can be removed from the query set via dropping the
322    /// [`QuerySubscription`] token returned by [`ConvexClient::subscribe`].
323    ///
324    ///
325    /// [`QueryResults`] is a copy-on-write mapping from [`SubscriberId`] to
326    /// its latest result [`Value`].
327    ///
328    /// ```no_run
329    /// # use convex::ConvexClient;
330    /// # use futures::StreamExt;
331    /// # #[tokio::main]
332    /// # async fn main() -> anyhow::Result<()> {
333    /// let mut client = ConvexClient::new("https://cool-music-123.convex.cloud").await?;
334    /// let mut watch = client.watch_all();
335    /// let sub1 = client.subscribe("listMessages", maplit::btreemap!{
336    ///     "channel".into() => 1.into(),
337    /// }).await?;
338    /// let sub2 = client.subscribe("listMessages", maplit::btreemap!{
339    ///     "channel".into() => 1.into(),
340    /// }).await?;
341    /// # Ok(())
342    /// # }
343    pub fn watch_all(&self) -> QuerySetSubscription {
344        QuerySetSubscription::new(BroadcastStream::new(self.watch_receiver.resubscribe()))
345    }
346
347    /// Set auth for use when calling Convex functions.
348    ///
349    /// Set it with a token that you get from your auth provider via their login
350    /// flow. If `None` is passed as the token, then auth is unset (logging
351    /// out).
352    pub async fn set_auth(&mut self, token: Option<String>) {
353        let req = AuthenticateRequest {
354            token: match token {
355                None => AuthenticationToken::None,
356                Some(token) => AuthenticationToken::User(token),
357            },
358        };
359        self.request_sender
360            .send(ClientRequest::Authenticate(req))
361            .expect("INTERNAL BUG: Worker has gone away");
362    }
363
364    /// Set admin auth for use when calling Convex functions as a deployment
365    /// admin. Not typically required.
366    ///
367    /// You can get a deploy_key from the Convex dashboard's deployment settings
368    /// page. Deployment admins can act as users as part of their
369    /// development flow to see how a function would act.
370    #[doc(hidden)]
371    pub async fn set_admin_auth(
372        &mut self,
373        deploy_key: String,
374        acting_as: Option<UserIdentityAttributes>,
375    ) {
376        let req = AuthenticateRequest {
377            token: AuthenticationToken::Admin(deploy_key, acting_as),
378        };
379        self.request_sender
380            .send(ClientRequest::Authenticate(req))
381            .expect("INTERNAL BUG: Worker has gone away");
382    }
383}
384
385fn deployment_to_ws_url(mut deployment_url: Url) -> anyhow::Result<Url> {
386    let ws_scheme = match deployment_url.scheme() {
387        "http" | "ws" => "ws",
388        "https" | "wss" => "wss",
389        scheme => anyhow::bail!("Unknown scheme {scheme}. Expected http or https."),
390    };
391    deployment_url
392        .set_scheme(ws_scheme)
393        .expect("Scheme not supported");
394    deployment_url.set_path("api/sync");
395    Ok(deployment_url)
396}
397
398/// A builder for creating a [`ConvexClient`] with custom configuration.
399pub struct ConvexClientBuilder {
400    deployment_url: String,
401    client_id: Option<String>,
402    on_state_change: Option<mpsc::Sender<WebSocketState>>,
403}
404
405impl ConvexClientBuilder {
406    /// Create a new [`ConvexClientBuilder`] with the given deployment URL.
407    pub fn new(deployment_url: &str) -> Self {
408        Self {
409            deployment_url: deployment_url.to_string(),
410            client_id: None,
411            on_state_change: None,
412        }
413    }
414
415    /// Set a custom client ID for this client.
416    pub fn with_client_id(mut self, client_id: &str) -> Self {
417        self.client_id = Some(client_id.to_string());
418        self
419    }
420
421    /// Set a channel to be notified of changes to the WebSocket connection
422    /// state.
423    pub fn with_on_state_change(mut self, on_state_change: mpsc::Sender<WebSocketState>) -> Self {
424        self.on_state_change = Some(on_state_change);
425        self
426    }
427
428    /// Build the [`ConvexClient`] with the configured options.
429    ///
430    /// ```no_run
431    /// # use convex::ConvexClientBuilder;
432    /// # #[tokio::main]
433    /// # async fn main() -> anyhow::Result<()> {
434    /// let client = ConvexClientBuilder::new("https://cool-music-123.convex.cloud").build().await?;
435    /// # Ok(())
436    /// # }
437    /// ```
438    pub async fn build(self) -> anyhow::Result<ConvexClient> {
439        ConvexClient::new_from_builder(self).await
440    }
441}
442
443#[cfg(test)]
444pub mod tests {
445    use std::{
446        str::FromStr,
447        sync::Arc,
448        time::Duration,
449    };
450
451    use convex_sync_types::{
452        AuthenticationToken,
453        ClientMessage,
454        LogLinesMessage,
455        Query,
456        QueryId,
457        QuerySetModification,
458        SessionId,
459        StateModification,
460        StateVersion,
461        UdfPath,
462        UserIdentityAttributes,
463    };
464    use futures::StreamExt;
465    use maplit::btreemap;
466    use pretty_assertions::assert_eq;
467    use serde_json::json;
468    use tokio::sync::{
469        broadcast,
470        mpsc,
471    };
472
473    use super::ConvexClient;
474    use crate::{
475        base_client::FunctionResult,
476        client::{
477            deployment_to_ws_url,
478            worker::worker,
479            BaseConvexClient,
480        },
481        sync::{
482            testing::TestProtocolManager,
483            ServerMessage,
484            SyncProtocol,
485        },
486        value::Value,
487        QuerySubscription,
488    };
489
490    impl ConvexClient {
491        pub async fn with_test_protocol() -> anyhow::Result<(Self, TestProtocolManager)> {
492            let _ = tracing_subscriber::fmt()
493                .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
494                .try_init();
495
496            // Channels for the `listen` background thread
497            let (response_sender, response_receiver) = mpsc::channel(1);
498            let (request_sender, request_receiver) = mpsc::unbounded_channel();
499
500            // Listener for when each transaction completes
501            let (watch_sender, watch_receiver) = broadcast::channel(1);
502
503            let test_protocol = TestProtocolManager::open(
504                "ws://test.com".parse()?,
505                response_sender,
506                None,
507                "rust-0.0.1",
508            )
509            .await?;
510            let base_client = BaseConvexClient::new();
511
512            let listen_handle = tokio::spawn(worker(
513                response_receiver,
514                request_receiver,
515                watch_sender,
516                base_client,
517                test_protocol.clone(),
518            ));
519
520            let client = ConvexClient {
521                listen_handle: Some(Arc::new(listen_handle)),
522                request_sender,
523                watch_receiver,
524            };
525            Ok((client, test_protocol))
526        }
527    }
528
529    fn fake_mutation_response(result: FunctionResult) -> (ServerMessage, ServerMessage) {
530        let (transition_response, new_version) = fake_transition(StateVersion::initial(), vec![]);
531        let mutation_response = ServerMessage::MutationResponse {
532            request_id: 0,
533            result: result.into(),
534            ts: Some(new_version.ts),
535            log_lines: LogLinesMessage(vec![]),
536        };
537        (mutation_response, transition_response)
538    }
539
540    fn fake_action_response(result: FunctionResult) -> ServerMessage {
541        ServerMessage::ActionResponse {
542            request_id: 0,
543            result: result.into(),
544            log_lines: LogLinesMessage(vec![]),
545        }
546    }
547
548    fn fake_transition(
549        start_version: StateVersion,
550        modifications: Vec<(QueryId, Value)>,
551    ) -> (ServerMessage, StateVersion) {
552        let end_version = StateVersion {
553            ts: start_version.ts.succ().expect("Succ failed"),
554            ..start_version
555        };
556        (
557            ServerMessage::Transition {
558                start_version,
559                end_version,
560                modifications: modifications
561                    .into_iter()
562                    .map(|(query_id, value)| StateModification::QueryUpdated {
563                        query_id,
564                        value,
565                        journal: None,
566                        log_lines: LogLinesMessage(vec![]),
567                    })
568                    .collect(),
569                client_clock_skew: None,
570                server_ts: None,
571            },
572            end_version,
573        )
574    }
575
576    #[tokio::test]
577    async fn test_mutation() -> anyhow::Result<()> {
578        let (mut client, mut test_protocol) = ConvexClient::with_test_protocol().await?;
579        test_protocol.take_sent().await;
580
581        let mut res =
582            tokio::spawn(async move { client.mutation("incrementCounter", btreemap! {}).await });
583        test_protocol.wait_until_n_messages_sent(1).await;
584
585        assert_eq!(
586            test_protocol.take_sent().await,
587            vec![ClientMessage::Mutation {
588                request_id: 0,
589                udf_path: UdfPath::from_str("incrementCounter")?,
590                args: vec![json!({})],
591                component_path: None,
592            }]
593        );
594
595        let mutation_result = FunctionResult::Value(Value::Null);
596        let (mut_resp, transition) = fake_mutation_response(mutation_result.clone());
597        test_protocol.fake_server_response(mut_resp).await?;
598        // Should not be ready until transition completes.
599        tokio::time::timeout(Duration::from_millis(50), &mut res)
600            .await
601            .unwrap_err();
602
603        // Once transition is sent, it is ready.
604        test_protocol.fake_server_response(transition).await?;
605        assert_eq!(res.await??, mutation_result);
606        Ok(())
607    }
608
609    #[tokio::test]
610    async fn test_mutation_error() -> anyhow::Result<()> {
611        let (mut client, mut test_protocol) = ConvexClient::with_test_protocol().await?;
612        test_protocol.take_sent().await;
613
614        let res =
615            tokio::spawn(async move { client.mutation("incrementCounter", btreemap! {}).await });
616        test_protocol.wait_until_n_messages_sent(1).await;
617        test_protocol.take_sent().await;
618
619        let mutation_result = FunctionResult::ErrorMessage("JEEPERS".into());
620        let (mut_resp, _transition) = fake_mutation_response(mutation_result.clone());
621        test_protocol.fake_server_response(mut_resp).await?;
622        // Errors should be ready immediately (no transition needed)
623        assert_eq!(res.await??, mutation_result);
624
625        Ok(())
626    }
627
628    #[tokio::test]
629    async fn test_action() -> anyhow::Result<()> {
630        let (mut client, mut test_protocol) = ConvexClient::with_test_protocol().await?;
631        test_protocol.take_sent().await;
632
633        let action_result = FunctionResult::Value(Value::Null);
634        let server_message = fake_action_response(action_result.clone());
635
636        let res = tokio::spawn(async move { client.action("runAction:hello", btreemap! {}).await });
637        test_protocol.wait_until_n_messages_sent(1).await;
638
639        assert_eq!(
640            test_protocol.take_sent().await,
641            vec![ClientMessage::Action {
642                request_id: 0,
643                udf_path: UdfPath::from_str("runAction:hello")?,
644                args: vec![json!({})],
645                component_path: None,
646            }]
647        );
648
649        test_protocol.fake_server_response(server_message).await?;
650        assert_eq!(res.await??, action_result);
651        Ok(())
652    }
653
654    #[tokio::test]
655    async fn test_auth() -> anyhow::Result<()> {
656        let (mut client, test_protocol) = ConvexClient::with_test_protocol().await?;
657        test_protocol.take_sent().await;
658
659        // Set token
660        client.set_auth(Some("myauthtoken".into())).await;
661        test_protocol.wait_until_n_messages_sent(1).await;
662        assert_eq!(
663            test_protocol.take_sent().await,
664            vec![ClientMessage::Authenticate {
665                base_version: 0,
666                token: AuthenticationToken::User("myauthtoken".into()),
667            }]
668        );
669
670        // Unset token
671        client.set_auth(None).await;
672        test_protocol.wait_until_n_messages_sent(1).await;
673        assert_eq!(
674            test_protocol.take_sent().await,
675            vec![ClientMessage::Authenticate {
676                base_version: 1,
677                token: AuthenticationToken::None,
678            }]
679        );
680
681        // Set admin auth
682        client.set_admin_auth("myadminauth".into(), None).await;
683        test_protocol.wait_until_n_messages_sent(1).await;
684        assert_eq!(
685            test_protocol.take_sent().await,
686            vec![ClientMessage::Authenticate {
687                base_version: 2,
688                token: AuthenticationToken::Admin("myadminauth".into(), None),
689            }]
690        );
691
692        // Set admin auth acting as user
693        let acting_as = UserIdentityAttributes {
694            name: Some("Barbara Liskov".into()),
695            ..Default::default()
696        };
697        client
698            .set_admin_auth("myadminauth".into(), Some(acting_as.clone()))
699            .await;
700        test_protocol.wait_until_n_messages_sent(1).await;
701        assert_eq!(
702            test_protocol.take_sent().await,
703            vec![ClientMessage::Authenticate {
704                base_version: 3,
705                token: AuthenticationToken::Admin("myadminauth".into(), Some(acting_as)),
706            }]
707        );
708        Ok(())
709    }
710
711    #[tokio::test]
712    async fn test_client_single_subscription() -> anyhow::Result<()> {
713        let (mut client, mut test_protocol) = ConvexClient::with_test_protocol().await?;
714
715        let mut subscription1 = client.subscribe("getValue1", btreemap! {}).await?;
716        let query_id = subscription1.query_id();
717        assert_eq!(
718            test_protocol.take_sent().await,
719            vec![
720                ClientMessage::Connect {
721                    session_id: SessionId::nil(),
722                    connection_count: 0,
723                    last_close_reason: "InitialConnect".to_string(),
724                    max_observed_timestamp: None,
725                    client_ts: None,
726                },
727                ClientMessage::ModifyQuerySet {
728                    base_version: 0,
729                    new_version: 1,
730                    modifications: vec![QuerySetModification::Add(Query {
731                        query_id,
732                        udf_path: "getValue1".parse()?,
733                        args: vec![json!({})],
734                        journal: None,
735                        component_path: None,
736                    })]
737                },
738            ]
739        );
740
741        test_protocol
742            .fake_server_response(
743                fake_transition(
744                    StateVersion::initial(),
745                    vec![(subscription1.query_id(), 10.into())],
746                )
747                .0,
748            )
749            .await?;
750        assert_eq!(
751            subscription1.next().await,
752            Some(FunctionResult::Value(10.into()))
753        );
754        assert_eq!(
755            client.query("getValue1", btreemap! {}).await?,
756            FunctionResult::Value(10.into())
757        );
758
759        drop(subscription1);
760        test_protocol.wait_until_n_messages_sent(1).await;
761        assert_eq!(
762            test_protocol.take_sent().await,
763            vec![ClientMessage::ModifyQuerySet {
764                base_version: 1,
765                new_version: 2,
766                modifications: vec![QuerySetModification::Remove { query_id }],
767            }]
768        );
769
770        Ok(())
771    }
772
773    #[tokio::test]
774    async fn test_client_subscribe_unsubscribe_subscribe() -> anyhow::Result<()> {
775        let (mut client, mut test_protocol) = ConvexClient::with_test_protocol().await?;
776        let subscription1b: QuerySubscription;
777        {
778            // This subscription goes out of scope and unsubscribes at the end of this
779            // block. The internal num_subscribers value gets decremented.
780            let _ignored = client.subscribe("getValue1", btreemap! {}).await?;
781            subscription1b = client.subscribe("getValue1", btreemap! {}).await?;
782        }
783        // In the buggy scenario, this subscription gets an ID via num_subscribers ID
784        // that matches subscription1b. That triggers a panic.
785        let subscription1c = client.subscribe("getValue1", btreemap! {}).await?;
786        test_protocol.take_sent().await;
787        let mut watch = client.watch_all();
788
789        test_protocol
790            .fake_server_response(
791                fake_transition(StateVersion::initial(), vec![(QueryId::new(0), 10.into())]).0,
792            )
793            .await?;
794
795        let results = watch.next().await.expect("Watch should have results");
796        assert_eq!(
797            results.get(&subscription1b),
798            Some(&FunctionResult::Value(10.into()))
799        );
800        assert_eq!(
801            results.get(&subscription1c),
802            Some(&FunctionResult::Value(10.into()))
803        );
804        Ok(())
805    }
806
807    #[tokio::test]
808    async fn test_client_consistent_view_watch() -> anyhow::Result<()> {
809        let (mut client, mut test_protocol) = ConvexClient::with_test_protocol().await?;
810        let subscription1 = client.subscribe("getValue1", btreemap! {}).await?;
811        let subscription2a = client.subscribe("getValue2", btreemap! {}).await?;
812        let subscription2b = client.subscribe("getValue2", btreemap! {}).await?;
813        let subscription3 = client.subscribe("getValue3", btreemap! {}).await?;
814        test_protocol.take_sent().await;
815        let mut watch = client.watch_all();
816
817        test_protocol
818            .fake_server_response(
819                fake_transition(
820                    StateVersion::initial(),
821                    vec![(QueryId::new(0), 10.into()), (QueryId::new(1), 20.into())],
822                )
823                .0,
824            )
825            .await?;
826
827        let results = watch.next().await.expect("Watch should have results");
828        assert_eq!(
829            results.get(&subscription1),
830            Some(&FunctionResult::Value(10.into()))
831        );
832        assert_eq!(
833            results.get(&subscription2a),
834            Some(&FunctionResult::Value(20.into()))
835        );
836        assert_eq!(
837            results.get(&subscription2b),
838            Some(&FunctionResult::Value(20.into()))
839        );
840        assert_eq!(results.get(&subscription3), None);
841        assert_eq!(
842            results.iter().collect::<Vec<_>>(),
843            vec![
844                (subscription1.id(), Some(&FunctionResult::Value(10.into()))),
845                (subscription2a.id(), Some(&FunctionResult::Value(20.into()))),
846                (subscription2b.id(), Some(&FunctionResult::Value(20.into()))),
847                (subscription3.id(), None,),
848            ]
849        );
850
851        // Ideally a new watch should immediately give you results, but we don't have
852        // that yet. Need to replace tokio::broadcast with something that buffers 1
853        // item.
854        //let mut watch2 = client.watch();
855        //let results = watch.next().await.expect("Watch should have results");
856        //assert_eq!(results.len(), 3);
857
858        Ok(())
859    }
860
861    #[tokio::test]
862    async fn test_drop_client() -> anyhow::Result<()> {
863        let (mut client, _test_protocol) = ConvexClient::with_test_protocol().await?;
864        let mut subscription1 = client.subscribe("getValue1", btreemap! {}).await?;
865        drop(client);
866        tokio::task::yield_now().await;
867        assert!(subscription1.next().await.is_none());
868        drop(subscription1);
869        Ok(())
870    }
871
872    #[tokio::test]
873    async fn test_client_separate_queries() -> anyhow::Result<()> {
874        let (mut client, test_protocol) = ConvexClient::with_test_protocol().await?;
875
876        // All three of these should be considered separate
877        let subscription1 = client.subscribe("getValue1", btreemap! {}).await?;
878        let subscription2 = client.subscribe("getValue2", btreemap! {}).await?;
879        let subscription3 = client
880            .subscribe("getValue2", btreemap! {"hello".into() => "world".into()})
881            .await?;
882        assert_ne!(subscription1.query_id(), subscription2.query_id());
883        assert_ne!(subscription2.query_id(), subscription3.query_id());
884
885        assert_eq!(
886            test_protocol.take_sent().await,
887            vec![
888                ClientMessage::Connect {
889                    session_id: SessionId::nil(),
890                    connection_count: 0,
891                    last_close_reason: "InitialConnect".to_string(),
892                    max_observed_timestamp: None,
893                    client_ts: None,
894                },
895                ClientMessage::ModifyQuerySet {
896                    base_version: 0,
897                    new_version: 1,
898                    modifications: vec![QuerySetModification::Add(Query {
899                        query_id: subscription1.query_id(),
900                        udf_path: "getValue1".parse()?,
901                        args: vec![json!({})],
902                        journal: None,
903                        component_path: None,
904                    })]
905                },
906                ClientMessage::ModifyQuerySet {
907                    base_version: 1,
908                    new_version: 2,
909                    modifications: vec![QuerySetModification::Add(Query {
910                        query_id: subscription2.query_id(),
911                        udf_path: "getValue2".parse()?,
912                        args: vec![json!({})],
913                        journal: None,
914                        component_path: None,
915                    })]
916                },
917                ClientMessage::ModifyQuerySet {
918                    base_version: 2,
919                    new_version: 3,
920                    modifications: vec![QuerySetModification::Add(Query {
921                        query_id: subscription3.query_id(),
922                        udf_path: "getValue2".parse()?,
923                        args: vec![json!({"hello": "world"})],
924                        journal: None,
925                        component_path: None,
926                    })]
927                },
928            ]
929        );
930
931        Ok(())
932    }
933
934    #[tokio::test]
935    async fn test_client_two_identical_queries() -> anyhow::Result<()> {
936        let (mut client, mut test_protocol) = ConvexClient::with_test_protocol().await?;
937
938        // These two should be considered the same query.
939        let mut subscription1 = client.subscribe("getValue", btreemap! {}).await?;
940        let mut subscription2 = client.subscribe("getValue", btreemap! {}).await?;
941
942        assert_ne!(subscription1.subscriber_id, subscription2.subscriber_id);
943        assert_eq!(subscription1.query_id(), subscription2.query_id());
944        let query_id = subscription1.query_id();
945
946        assert_eq!(
947            test_protocol.take_sent().await,
948            vec![
949                ClientMessage::Connect {
950                    session_id: SessionId::nil(),
951                    connection_count: 0,
952                    last_close_reason: "InitialConnect".to_string(),
953                    max_observed_timestamp: None,
954                    client_ts: None,
955                },
956                ClientMessage::ModifyQuerySet {
957                    base_version: 0,
958                    new_version: 1,
959                    modifications: vec![QuerySetModification::Add(Query {
960                        query_id,
961                        udf_path: "getValue".parse()?,
962                        args: vec![json!({})],
963                        journal: None,
964                        component_path: None,
965                    })]
966                },
967            ]
968        );
969
970        let mut version = StateVersion::initial();
971        for i in 1..5 {
972            let (transition, new_version) = fake_transition(version, vec![(query_id, i.into())]);
973            test_protocol.fake_server_response(transition).await?;
974            version = new_version;
975
976            assert_eq!(
977                subscription1.next().await,
978                Some(FunctionResult::Value(i.into()))
979            );
980            assert_eq!(
981                subscription2.next().await,
982                Some(FunctionResult::Value(i.into()))
983            );
984        }
985
986        // A new subscription should auto-initialize with the value if available
987        let mut subscription3 = client.subscribe("getValue", btreemap! {}).await?;
988        assert_eq!(
989            subscription3.next().await,
990            Some(FunctionResult::Value(4.into())),
991        );
992
993        // Dropping sub1 and sub2 should still maintain subscription
994        drop(subscription1);
995        drop(subscription2);
996        let (transition, _new_version) = fake_transition(version, vec![(query_id, 5.into())]);
997        test_protocol.fake_server_response(transition).await?;
998        assert_eq!(
999            subscription3.next().await,
1000            Some(FunctionResult::Value(5.into())),
1001        );
1002
1003        Ok(())
1004    }
1005
1006    #[test]
1007    fn test_deployment_url() -> anyhow::Result<()> {
1008        assert_eq!(
1009            deployment_to_ws_url("http://flying-shark-123.convex.cloud".parse()?)?.to_string(),
1010            "ws://flying-shark-123.convex.cloud/api/sync",
1011        );
1012        assert_eq!(
1013            deployment_to_ws_url("https://flying-shark-123.convex.cloud".parse()?)?.to_string(),
1014            "wss://flying-shark-123.convex.cloud/api/sync",
1015        );
1016        assert_eq!(
1017            deployment_to_ws_url("ws://flying-shark-123.convex.cloud".parse()?)?.to_string(),
1018            "ws://flying-shark-123.convex.cloud/api/sync",
1019        );
1020        assert_eq!(
1021            deployment_to_ws_url("wss://flying-shark-123.convex.cloud".parse()?)?.to_string(),
1022            "wss://flying-shark-123.convex.cloud/api/sync",
1023        );
1024        assert_eq!(
1025            deployment_to_ws_url("ftp://flying-shark-123.convex.cloud".parse()?)
1026                .unwrap_err()
1027                .to_string(),
1028            "Unknown scheme ftp. Expected http or https.",
1029        );
1030        Ok(())
1031    }
1032}