1use std::{
2 collections::BTreeMap,
3 convert::Infallible,
4 sync::Arc,
5};
6
7use convex_sync_types::{
8 AuthenticationToken,
9 UdfPath,
10 UserIdentityAttributes,
11};
12#[cfg(doc)]
13use futures::Stream;
14use futures::StreamExt;
15use tokio::{
16 sync::{
17 broadcast,
18 mpsc,
19 oneshot,
20 },
21 task::JoinHandle,
22};
23use tokio_stream::wrappers::BroadcastStream;
24use url::Url;
25
26use self::worker::AuthenticateRequest;
27#[cfg(doc)]
28use crate::SubscriberId;
29use crate::{
30 base_client::{
31 BaseConvexClient,
32 QueryResults,
33 },
34 client::{
35 subscription::{
36 QuerySetSubscription,
37 QuerySubscription,
38 },
39 worker::{
40 worker,
41 ActionRequest,
42 ClientRequest,
43 MutationRequest,
44 SubscribeRequest,
45 },
46 },
47 sync::{
48 web_socket_manager::WebSocketManager,
49 SyncProtocol,
50 WebSocketState,
51 },
52 value::Value,
53 FunctionResult,
54};
55
56pub mod subscription;
57mod worker;
58
59const VERSION: Option<&str> = option_env!("CARGO_PKG_VERSION");
60
61pub struct ConvexClient {
90 listen_handle: Option<Arc<JoinHandle<Infallible>>>,
91 request_sender: mpsc::UnboundedSender<ClientRequest>,
92 watch_receiver: broadcast::Receiver<QueryResults>,
93}
94
95impl Clone for ConvexClient {
98 fn clone(&self) -> Self {
99 Self {
100 listen_handle: self.listen_handle.clone(),
101 request_sender: self.request_sender.clone(),
102 watch_receiver: self.watch_receiver.resubscribe(),
103 }
104 }
105}
106
107impl Drop for ConvexClient {
110 fn drop(&mut self) {
111 if let Ok(j_handle) = Arc::try_unwrap(
112 self.listen_handle
113 .take()
114 .expect("INTERNAL BUG: listen handle should never be none"),
115 ) {
116 j_handle.abort()
117 }
118 }
119}
120
121impl ConvexClient {
122 pub async fn new(deployment_url: &str) -> anyhow::Result<Self> {
133 ConvexClient::new_from_builder(ConvexClientBuilder::new(deployment_url)).await
134 }
135
136 #[doc(hidden)]
137 pub async fn new_from_builder(builder: ConvexClientBuilder) -> anyhow::Result<Self> {
138 let client_id = builder
139 .client_id
140 .unwrap_or_else(|| format!("rust-{}", VERSION.unwrap_or("unknown")));
141 let ws_url = deployment_to_ws_url(builder.deployment_url.as_str().try_into()?)?;
142
143 let (response_sender, response_receiver) = mpsc::channel(1);
145 let (request_sender, request_receiver) = mpsc::unbounded_channel();
146
147 let (watch_sender, watch_receiver) = broadcast::channel(1);
149
150 let base_client = BaseConvexClient::new();
151
152 let protocol = WebSocketManager::open(
153 ws_url,
154 response_sender,
155 builder.on_state_change,
156 client_id.as_str(),
157 )
158 .await?;
159
160 let listen_handle = tokio::spawn(worker(
161 response_receiver,
162 request_receiver,
163 watch_sender,
164 base_client,
165 protocol,
166 ));
167 let client = ConvexClient {
168 listen_handle: Some(Arc::new(listen_handle)),
169 request_sender,
170 watch_receiver,
171 };
172 Ok(client)
173 }
174
175 pub async fn subscribe(
196 &mut self,
197 name: &str,
198 args: BTreeMap<String, Value>,
199 ) -> anyhow::Result<QuerySubscription> {
200 let (tx, rx) = oneshot::channel();
201
202 let udf_path = name.parse()?;
203 let request = SubscribeRequest { udf_path, args };
204
205 self.request_sender.send(ClientRequest::Subscribe(
206 request,
207 tx,
208 self.request_sender.clone(),
209 ))?;
210
211 let res = rx.await?;
212 Ok(res)
213 }
214
215 pub async fn query(
235 &mut self,
236 name: &str,
237 args: BTreeMap<String, Value>,
238 ) -> anyhow::Result<FunctionResult> {
239 Ok(self
240 .subscribe(name, args)
241 .await?
242 .next()
243 .await
244 .expect("INTERNAL BUG: Convex Client dropped prematurely."))
245 }
246
247 pub async fn mutation(
264 &mut self,
265 name: &str,
266 args: BTreeMap<String, Value>,
267 ) -> anyhow::Result<FunctionResult> {
268 let (tx, rx) = oneshot::channel();
269
270 let udf_path: UdfPath = name.parse()?;
271 let request = MutationRequest { udf_path, args };
272
273 self.request_sender
274 .send(ClientRequest::Mutation(request, tx))?;
275
276 let res = rx.await?;
277 Ok(res.await?)
278 }
279
280 pub async fn action(
297 &mut self,
298 name: &str,
299 args: BTreeMap<String, Value>,
300 ) -> anyhow::Result<FunctionResult> {
301 let (tx, rx) = oneshot::channel();
302
303 let udf_path: UdfPath = name.parse()?;
304 let request = ActionRequest { udf_path, args };
305
306 self.request_sender
307 .send(ClientRequest::Action(request, tx))?;
308
309 let res = rx.await?;
310 Ok(res.await?)
311 }
312
313 pub fn watch_all(&self) -> QuerySetSubscription {
344 QuerySetSubscription::new(BroadcastStream::new(self.watch_receiver.resubscribe()))
345 }
346
347 pub async fn set_auth(&mut self, token: Option<String>) {
353 let req = AuthenticateRequest {
354 token: match token {
355 None => AuthenticationToken::None,
356 Some(token) => AuthenticationToken::User(token),
357 },
358 };
359 self.request_sender
360 .send(ClientRequest::Authenticate(req))
361 .expect("INTERNAL BUG: Worker has gone away");
362 }
363
364 #[doc(hidden)]
371 pub async fn set_admin_auth(
372 &mut self,
373 deploy_key: String,
374 acting_as: Option<UserIdentityAttributes>,
375 ) {
376 let req = AuthenticateRequest {
377 token: AuthenticationToken::Admin(deploy_key, acting_as),
378 };
379 self.request_sender
380 .send(ClientRequest::Authenticate(req))
381 .expect("INTERNAL BUG: Worker has gone away");
382 }
383}
384
385fn deployment_to_ws_url(mut deployment_url: Url) -> anyhow::Result<Url> {
386 let ws_scheme = match deployment_url.scheme() {
387 "http" | "ws" => "ws",
388 "https" | "wss" => "wss",
389 scheme => anyhow::bail!("Unknown scheme {scheme}. Expected http or https."),
390 };
391 deployment_url
392 .set_scheme(ws_scheme)
393 .expect("Scheme not supported");
394 deployment_url.set_path("api/sync");
395 Ok(deployment_url)
396}
397
398pub struct ConvexClientBuilder {
400 deployment_url: String,
401 client_id: Option<String>,
402 on_state_change: Option<mpsc::Sender<WebSocketState>>,
403}
404
405impl ConvexClientBuilder {
406 pub fn new(deployment_url: &str) -> Self {
408 Self {
409 deployment_url: deployment_url.to_string(),
410 client_id: None,
411 on_state_change: None,
412 }
413 }
414
415 pub fn with_client_id(mut self, client_id: &str) -> Self {
417 self.client_id = Some(client_id.to_string());
418 self
419 }
420
421 pub fn with_on_state_change(mut self, on_state_change: mpsc::Sender<WebSocketState>) -> Self {
424 self.on_state_change = Some(on_state_change);
425 self
426 }
427
428 pub async fn build(self) -> anyhow::Result<ConvexClient> {
439 ConvexClient::new_from_builder(self).await
440 }
441}
442
443#[cfg(test)]
444pub mod tests {
445 use std::{
446 str::FromStr,
447 sync::Arc,
448 time::Duration,
449 };
450
451 use convex_sync_types::{
452 AuthenticationToken,
453 ClientMessage,
454 LogLinesMessage,
455 Query,
456 QueryId,
457 QuerySetModification,
458 SessionId,
459 StateModification,
460 StateVersion,
461 UdfPath,
462 UserIdentityAttributes,
463 };
464 use futures::StreamExt;
465 use maplit::btreemap;
466 use pretty_assertions::assert_eq;
467 use serde_json::json;
468 use tokio::sync::{
469 broadcast,
470 mpsc,
471 };
472
473 use super::ConvexClient;
474 use crate::{
475 base_client::FunctionResult,
476 client::{
477 deployment_to_ws_url,
478 worker::worker,
479 BaseConvexClient,
480 },
481 sync::{
482 testing::TestProtocolManager,
483 ServerMessage,
484 SyncProtocol,
485 },
486 value::Value,
487 QuerySubscription,
488 };
489
490 impl ConvexClient {
491 pub async fn with_test_protocol() -> anyhow::Result<(Self, TestProtocolManager)> {
492 let _ = tracing_subscriber::fmt()
493 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
494 .try_init();
495
496 let (response_sender, response_receiver) = mpsc::channel(1);
498 let (request_sender, request_receiver) = mpsc::unbounded_channel();
499
500 let (watch_sender, watch_receiver) = broadcast::channel(1);
502
503 let test_protocol = TestProtocolManager::open(
504 "ws://test.com".parse()?,
505 response_sender,
506 None,
507 "rust-0.0.1",
508 )
509 .await?;
510 let base_client = BaseConvexClient::new();
511
512 let listen_handle = tokio::spawn(worker(
513 response_receiver,
514 request_receiver,
515 watch_sender,
516 base_client,
517 test_protocol.clone(),
518 ));
519
520 let client = ConvexClient {
521 listen_handle: Some(Arc::new(listen_handle)),
522 request_sender,
523 watch_receiver,
524 };
525 Ok((client, test_protocol))
526 }
527 }
528
529 fn fake_mutation_response(result: FunctionResult) -> (ServerMessage, ServerMessage) {
530 let (transition_response, new_version) = fake_transition(StateVersion::initial(), vec![]);
531 let mutation_response = ServerMessage::MutationResponse {
532 request_id: 0,
533 result: result.into(),
534 ts: Some(new_version.ts),
535 log_lines: LogLinesMessage(vec![]),
536 };
537 (mutation_response, transition_response)
538 }
539
540 fn fake_action_response(result: FunctionResult) -> ServerMessage {
541 ServerMessage::ActionResponse {
542 request_id: 0,
543 result: result.into(),
544 log_lines: LogLinesMessage(vec![]),
545 }
546 }
547
548 fn fake_transition(
549 start_version: StateVersion,
550 modifications: Vec<(QueryId, Value)>,
551 ) -> (ServerMessage, StateVersion) {
552 let end_version = StateVersion {
553 ts: start_version.ts.succ().expect("Succ failed"),
554 ..start_version
555 };
556 (
557 ServerMessage::Transition {
558 start_version,
559 end_version,
560 modifications: modifications
561 .into_iter()
562 .map(|(query_id, value)| StateModification::QueryUpdated {
563 query_id,
564 value,
565 journal: None,
566 log_lines: LogLinesMessage(vec![]),
567 })
568 .collect(),
569 client_clock_skew: None,
570 server_ts: None,
571 },
572 end_version,
573 )
574 }
575
576 #[tokio::test]
577 async fn test_mutation() -> anyhow::Result<()> {
578 let (mut client, mut test_protocol) = ConvexClient::with_test_protocol().await?;
579 test_protocol.take_sent().await;
580
581 let mut res =
582 tokio::spawn(async move { client.mutation("incrementCounter", btreemap! {}).await });
583 test_protocol.wait_until_n_messages_sent(1).await;
584
585 assert_eq!(
586 test_protocol.take_sent().await,
587 vec![ClientMessage::Mutation {
588 request_id: 0,
589 udf_path: UdfPath::from_str("incrementCounter")?,
590 args: vec![json!({})],
591 component_path: None,
592 }]
593 );
594
595 let mutation_result = FunctionResult::Value(Value::Null);
596 let (mut_resp, transition) = fake_mutation_response(mutation_result.clone());
597 test_protocol.fake_server_response(mut_resp).await?;
598 tokio::time::timeout(Duration::from_millis(50), &mut res)
600 .await
601 .unwrap_err();
602
603 test_protocol.fake_server_response(transition).await?;
605 assert_eq!(res.await??, mutation_result);
606 Ok(())
607 }
608
609 #[tokio::test]
610 async fn test_mutation_error() -> anyhow::Result<()> {
611 let (mut client, mut test_protocol) = ConvexClient::with_test_protocol().await?;
612 test_protocol.take_sent().await;
613
614 let res =
615 tokio::spawn(async move { client.mutation("incrementCounter", btreemap! {}).await });
616 test_protocol.wait_until_n_messages_sent(1).await;
617 test_protocol.take_sent().await;
618
619 let mutation_result = FunctionResult::ErrorMessage("JEEPERS".into());
620 let (mut_resp, _transition) = fake_mutation_response(mutation_result.clone());
621 test_protocol.fake_server_response(mut_resp).await?;
622 assert_eq!(res.await??, mutation_result);
624
625 Ok(())
626 }
627
628 #[tokio::test]
629 async fn test_action() -> anyhow::Result<()> {
630 let (mut client, mut test_protocol) = ConvexClient::with_test_protocol().await?;
631 test_protocol.take_sent().await;
632
633 let action_result = FunctionResult::Value(Value::Null);
634 let server_message = fake_action_response(action_result.clone());
635
636 let res = tokio::spawn(async move { client.action("runAction:hello", btreemap! {}).await });
637 test_protocol.wait_until_n_messages_sent(1).await;
638
639 assert_eq!(
640 test_protocol.take_sent().await,
641 vec![ClientMessage::Action {
642 request_id: 0,
643 udf_path: UdfPath::from_str("runAction:hello")?,
644 args: vec![json!({})],
645 component_path: None,
646 }]
647 );
648
649 test_protocol.fake_server_response(server_message).await?;
650 assert_eq!(res.await??, action_result);
651 Ok(())
652 }
653
654 #[tokio::test]
655 async fn test_auth() -> anyhow::Result<()> {
656 let (mut client, test_protocol) = ConvexClient::with_test_protocol().await?;
657 test_protocol.take_sent().await;
658
659 client.set_auth(Some("myauthtoken".into())).await;
661 test_protocol.wait_until_n_messages_sent(1).await;
662 assert_eq!(
663 test_protocol.take_sent().await,
664 vec![ClientMessage::Authenticate {
665 base_version: 0,
666 token: AuthenticationToken::User("myauthtoken".into()),
667 }]
668 );
669
670 client.set_auth(None).await;
672 test_protocol.wait_until_n_messages_sent(1).await;
673 assert_eq!(
674 test_protocol.take_sent().await,
675 vec![ClientMessage::Authenticate {
676 base_version: 1,
677 token: AuthenticationToken::None,
678 }]
679 );
680
681 client.set_admin_auth("myadminauth".into(), None).await;
683 test_protocol.wait_until_n_messages_sent(1).await;
684 assert_eq!(
685 test_protocol.take_sent().await,
686 vec![ClientMessage::Authenticate {
687 base_version: 2,
688 token: AuthenticationToken::Admin("myadminauth".into(), None),
689 }]
690 );
691
692 let acting_as = UserIdentityAttributes {
694 name: Some("Barbara Liskov".into()),
695 ..Default::default()
696 };
697 client
698 .set_admin_auth("myadminauth".into(), Some(acting_as.clone()))
699 .await;
700 test_protocol.wait_until_n_messages_sent(1).await;
701 assert_eq!(
702 test_protocol.take_sent().await,
703 vec![ClientMessage::Authenticate {
704 base_version: 3,
705 token: AuthenticationToken::Admin("myadminauth".into(), Some(acting_as)),
706 }]
707 );
708 Ok(())
709 }
710
711 #[tokio::test]
712 async fn test_client_single_subscription() -> anyhow::Result<()> {
713 let (mut client, mut test_protocol) = ConvexClient::with_test_protocol().await?;
714
715 let mut subscription1 = client.subscribe("getValue1", btreemap! {}).await?;
716 let query_id = subscription1.query_id();
717 assert_eq!(
718 test_protocol.take_sent().await,
719 vec![
720 ClientMessage::Connect {
721 session_id: SessionId::nil(),
722 connection_count: 0,
723 last_close_reason: "InitialConnect".to_string(),
724 max_observed_timestamp: None,
725 client_ts: None,
726 },
727 ClientMessage::ModifyQuerySet {
728 base_version: 0,
729 new_version: 1,
730 modifications: vec![QuerySetModification::Add(Query {
731 query_id,
732 udf_path: "getValue1".parse()?,
733 args: vec![json!({})],
734 journal: None,
735 component_path: None,
736 })]
737 },
738 ]
739 );
740
741 test_protocol
742 .fake_server_response(
743 fake_transition(
744 StateVersion::initial(),
745 vec![(subscription1.query_id(), 10.into())],
746 )
747 .0,
748 )
749 .await?;
750 assert_eq!(
751 subscription1.next().await,
752 Some(FunctionResult::Value(10.into()))
753 );
754 assert_eq!(
755 client.query("getValue1", btreemap! {}).await?,
756 FunctionResult::Value(10.into())
757 );
758
759 drop(subscription1);
760 test_protocol.wait_until_n_messages_sent(1).await;
761 assert_eq!(
762 test_protocol.take_sent().await,
763 vec![ClientMessage::ModifyQuerySet {
764 base_version: 1,
765 new_version: 2,
766 modifications: vec![QuerySetModification::Remove { query_id }],
767 }]
768 );
769
770 Ok(())
771 }
772
773 #[tokio::test]
774 async fn test_client_subscribe_unsubscribe_subscribe() -> anyhow::Result<()> {
775 let (mut client, mut test_protocol) = ConvexClient::with_test_protocol().await?;
776 let subscription1b: QuerySubscription;
777 {
778 let _ignored = client.subscribe("getValue1", btreemap! {}).await?;
781 subscription1b = client.subscribe("getValue1", btreemap! {}).await?;
782 }
783 let subscription1c = client.subscribe("getValue1", btreemap! {}).await?;
786 test_protocol.take_sent().await;
787 let mut watch = client.watch_all();
788
789 test_protocol
790 .fake_server_response(
791 fake_transition(StateVersion::initial(), vec![(QueryId::new(0), 10.into())]).0,
792 )
793 .await?;
794
795 let results = watch.next().await.expect("Watch should have results");
796 assert_eq!(
797 results.get(&subscription1b),
798 Some(&FunctionResult::Value(10.into()))
799 );
800 assert_eq!(
801 results.get(&subscription1c),
802 Some(&FunctionResult::Value(10.into()))
803 );
804 Ok(())
805 }
806
807 #[tokio::test]
808 async fn test_client_consistent_view_watch() -> anyhow::Result<()> {
809 let (mut client, mut test_protocol) = ConvexClient::with_test_protocol().await?;
810 let subscription1 = client.subscribe("getValue1", btreemap! {}).await?;
811 let subscription2a = client.subscribe("getValue2", btreemap! {}).await?;
812 let subscription2b = client.subscribe("getValue2", btreemap! {}).await?;
813 let subscription3 = client.subscribe("getValue3", btreemap! {}).await?;
814 test_protocol.take_sent().await;
815 let mut watch = client.watch_all();
816
817 test_protocol
818 .fake_server_response(
819 fake_transition(
820 StateVersion::initial(),
821 vec![(QueryId::new(0), 10.into()), (QueryId::new(1), 20.into())],
822 )
823 .0,
824 )
825 .await?;
826
827 let results = watch.next().await.expect("Watch should have results");
828 assert_eq!(
829 results.get(&subscription1),
830 Some(&FunctionResult::Value(10.into()))
831 );
832 assert_eq!(
833 results.get(&subscription2a),
834 Some(&FunctionResult::Value(20.into()))
835 );
836 assert_eq!(
837 results.get(&subscription2b),
838 Some(&FunctionResult::Value(20.into()))
839 );
840 assert_eq!(results.get(&subscription3), None);
841 assert_eq!(
842 results.iter().collect::<Vec<_>>(),
843 vec![
844 (subscription1.id(), Some(&FunctionResult::Value(10.into()))),
845 (subscription2a.id(), Some(&FunctionResult::Value(20.into()))),
846 (subscription2b.id(), Some(&FunctionResult::Value(20.into()))),
847 (subscription3.id(), None,),
848 ]
849 );
850
851 Ok(())
859 }
860
861 #[tokio::test]
862 async fn test_drop_client() -> anyhow::Result<()> {
863 let (mut client, _test_protocol) = ConvexClient::with_test_protocol().await?;
864 let mut subscription1 = client.subscribe("getValue1", btreemap! {}).await?;
865 drop(client);
866 tokio::task::yield_now().await;
867 assert!(subscription1.next().await.is_none());
868 drop(subscription1);
869 Ok(())
870 }
871
872 #[tokio::test]
873 async fn test_client_separate_queries() -> anyhow::Result<()> {
874 let (mut client, test_protocol) = ConvexClient::with_test_protocol().await?;
875
876 let subscription1 = client.subscribe("getValue1", btreemap! {}).await?;
878 let subscription2 = client.subscribe("getValue2", btreemap! {}).await?;
879 let subscription3 = client
880 .subscribe("getValue2", btreemap! {"hello".into() => "world".into()})
881 .await?;
882 assert_ne!(subscription1.query_id(), subscription2.query_id());
883 assert_ne!(subscription2.query_id(), subscription3.query_id());
884
885 assert_eq!(
886 test_protocol.take_sent().await,
887 vec![
888 ClientMessage::Connect {
889 session_id: SessionId::nil(),
890 connection_count: 0,
891 last_close_reason: "InitialConnect".to_string(),
892 max_observed_timestamp: None,
893 client_ts: None,
894 },
895 ClientMessage::ModifyQuerySet {
896 base_version: 0,
897 new_version: 1,
898 modifications: vec![QuerySetModification::Add(Query {
899 query_id: subscription1.query_id(),
900 udf_path: "getValue1".parse()?,
901 args: vec![json!({})],
902 journal: None,
903 component_path: None,
904 })]
905 },
906 ClientMessage::ModifyQuerySet {
907 base_version: 1,
908 new_version: 2,
909 modifications: vec![QuerySetModification::Add(Query {
910 query_id: subscription2.query_id(),
911 udf_path: "getValue2".parse()?,
912 args: vec![json!({})],
913 journal: None,
914 component_path: None,
915 })]
916 },
917 ClientMessage::ModifyQuerySet {
918 base_version: 2,
919 new_version: 3,
920 modifications: vec![QuerySetModification::Add(Query {
921 query_id: subscription3.query_id(),
922 udf_path: "getValue2".parse()?,
923 args: vec![json!({"hello": "world"})],
924 journal: None,
925 component_path: None,
926 })]
927 },
928 ]
929 );
930
931 Ok(())
932 }
933
934 #[tokio::test]
935 async fn test_client_two_identical_queries() -> anyhow::Result<()> {
936 let (mut client, mut test_protocol) = ConvexClient::with_test_protocol().await?;
937
938 let mut subscription1 = client.subscribe("getValue", btreemap! {}).await?;
940 let mut subscription2 = client.subscribe("getValue", btreemap! {}).await?;
941
942 assert_ne!(subscription1.subscriber_id, subscription2.subscriber_id);
943 assert_eq!(subscription1.query_id(), subscription2.query_id());
944 let query_id = subscription1.query_id();
945
946 assert_eq!(
947 test_protocol.take_sent().await,
948 vec![
949 ClientMessage::Connect {
950 session_id: SessionId::nil(),
951 connection_count: 0,
952 last_close_reason: "InitialConnect".to_string(),
953 max_observed_timestamp: None,
954 client_ts: None,
955 },
956 ClientMessage::ModifyQuerySet {
957 base_version: 0,
958 new_version: 1,
959 modifications: vec![QuerySetModification::Add(Query {
960 query_id,
961 udf_path: "getValue".parse()?,
962 args: vec![json!({})],
963 journal: None,
964 component_path: None,
965 })]
966 },
967 ]
968 );
969
970 let mut version = StateVersion::initial();
971 for i in 1..5 {
972 let (transition, new_version) = fake_transition(version, vec![(query_id, i.into())]);
973 test_protocol.fake_server_response(transition).await?;
974 version = new_version;
975
976 assert_eq!(
977 subscription1.next().await,
978 Some(FunctionResult::Value(i.into()))
979 );
980 assert_eq!(
981 subscription2.next().await,
982 Some(FunctionResult::Value(i.into()))
983 );
984 }
985
986 let mut subscription3 = client.subscribe("getValue", btreemap! {}).await?;
988 assert_eq!(
989 subscription3.next().await,
990 Some(FunctionResult::Value(4.into())),
991 );
992
993 drop(subscription1);
995 drop(subscription2);
996 let (transition, _new_version) = fake_transition(version, vec![(query_id, 5.into())]);
997 test_protocol.fake_server_response(transition).await?;
998 assert_eq!(
999 subscription3.next().await,
1000 Some(FunctionResult::Value(5.into())),
1001 );
1002
1003 Ok(())
1004 }
1005
1006 #[test]
1007 fn test_deployment_url() -> anyhow::Result<()> {
1008 assert_eq!(
1009 deployment_to_ws_url("http://flying-shark-123.convex.cloud".parse()?)?.to_string(),
1010 "ws://flying-shark-123.convex.cloud/api/sync",
1011 );
1012 assert_eq!(
1013 deployment_to_ws_url("https://flying-shark-123.convex.cloud".parse()?)?.to_string(),
1014 "wss://flying-shark-123.convex.cloud/api/sync",
1015 );
1016 assert_eq!(
1017 deployment_to_ws_url("ws://flying-shark-123.convex.cloud".parse()?)?.to_string(),
1018 "ws://flying-shark-123.convex.cloud/api/sync",
1019 );
1020 assert_eq!(
1021 deployment_to_ws_url("wss://flying-shark-123.convex.cloud".parse()?)?.to_string(),
1022 "wss://flying-shark-123.convex.cloud/api/sync",
1023 );
1024 assert_eq!(
1025 deployment_to_ws_url("ftp://flying-shark-123.convex.cloud".parse()?)
1026 .unwrap_err()
1027 .to_string(),
1028 "Unknown scheme ftp. Expected http or https.",
1029 );
1030 Ok(())
1031 }
1032}