use std::{
collections::BTreeMap,
convert::Infallible,
future::Future,
pin::Pin,
sync::Arc,
};
use convex_sync_types::{
AuthenticationToken,
UdfPath,
UserIdentityAttributes,
};
#[cfg(doc)]
use futures::Stream;
use futures::StreamExt;
use tokio::{
sync::{
broadcast,
mpsc,
oneshot,
},
task::JoinHandle,
};
use tokio_stream::wrappers::BroadcastStream;
use url::Url;
pub use crate::base_client::AuthTokenFetcher;
#[cfg(doc)]
use crate::SubscriberId;
use crate::{
base_client::{
BaseConvexClient,
QueryResults,
},
client::{
subscription::{
QuerySetSubscription,
QuerySubscription,
},
worker::{
worker,
ActionRequest,
ClientRequest,
MutationRequest,
SubscribeRequest,
},
},
sync::{
web_socket_manager::WebSocketManager,
SyncProtocol,
WebSocketState,
},
value::Value,
FunctionResult,
};
pub mod subscription;
mod worker;
const VERSION: Option<&str> = option_env!("CARGO_PKG_VERSION");
pub struct ConvexClient {
listen_handle: Option<Arc<JoinHandle<Infallible>>>,
request_sender: mpsc::UnboundedSender<ClientRequest>,
watch_receiver: broadcast::Receiver<QueryResults>,
}
impl Clone for ConvexClient {
fn clone(&self) -> Self {
Self {
listen_handle: self.listen_handle.clone(),
request_sender: self.request_sender.clone(),
watch_receiver: self.watch_receiver.resubscribe(),
}
}
}
impl Drop for ConvexClient {
fn drop(&mut self) {
if let Ok(j_handle) = Arc::try_unwrap(
self.listen_handle
.take()
.expect("INTERNAL BUG: listen handle should never be none"),
) {
j_handle.abort()
}
}
}
impl ConvexClient {
pub async fn new(deployment_url: &str) -> anyhow::Result<Self> {
ConvexClient::new_from_builder(ConvexClientBuilder::new(deployment_url)).await
}
#[doc(hidden)]
pub async fn new_from_builder(builder: ConvexClientBuilder) -> anyhow::Result<Self> {
let client_id = builder
.client_id
.unwrap_or_else(|| format!("rust-{}", VERSION.unwrap_or("unknown")));
let ws_url = deployment_to_ws_url(builder.deployment_url.as_str().try_into()?)?;
let (response_sender, response_receiver) = mpsc::channel(1);
let (request_sender, request_receiver) = mpsc::unbounded_channel();
let (watch_sender, watch_receiver) = broadcast::channel(1);
let base_client = BaseConvexClient::new();
let protocol = WebSocketManager::open(
ws_url,
response_sender,
builder.on_state_change,
client_id.as_str(),
)
.await?;
let listen_handle = tokio::spawn(worker(
response_receiver,
request_receiver,
watch_sender,
base_client,
protocol,
));
let client = ConvexClient {
listen_handle: Some(Arc::new(listen_handle)),
request_sender,
watch_receiver,
};
Ok(client)
}
pub async fn subscribe(
&mut self,
name: &str,
args: BTreeMap<String, Value>,
) -> anyhow::Result<QuerySubscription> {
let (tx, rx) = oneshot::channel();
let udf_path = name.parse()?;
let request = SubscribeRequest { udf_path, args };
self.request_sender.send(ClientRequest::Subscribe(
request,
tx,
self.request_sender.clone(),
))?;
let res = rx.await?;
Ok(res)
}
pub async fn query(
&mut self,
name: &str,
args: BTreeMap<String, Value>,
) -> anyhow::Result<FunctionResult> {
Ok(self
.subscribe(name, args)
.await?
.next()
.await
.expect("INTERNAL BUG: Convex Client dropped prematurely."))
}
pub async fn mutation(
&mut self,
name: &str,
args: BTreeMap<String, Value>,
) -> anyhow::Result<FunctionResult> {
let (tx, rx) = oneshot::channel();
let udf_path: UdfPath = name.parse()?;
let request = MutationRequest { udf_path, args };
self.request_sender
.send(ClientRequest::Mutation(request, tx))?;
let res = rx.await?;
Ok(res.await?)
}
pub async fn action(
&mut self,
name: &str,
args: BTreeMap<String, Value>,
) -> anyhow::Result<FunctionResult> {
let (tx, rx) = oneshot::channel();
let udf_path: UdfPath = name.parse()?;
let request = ActionRequest { udf_path, args };
self.request_sender
.send(ClientRequest::Action(request, tx))?;
let res = rx.await?;
Ok(res.await?)
}
pub fn watch_all(&self) -> QuerySetSubscription {
QuerySetSubscription::new(BroadcastStream::new(self.watch_receiver.resubscribe()))
}
pub async fn set_auth(&mut self, token: Option<String>) {
let fetcher: Option<AuthTokenFetcher> = token.map(|t| {
Box::new(move |_force_refresh: bool| {
let t = t.clone();
Box::pin(async move { Ok(AuthenticationToken::User(t)) })
as Pin<Box<dyn Future<Output = anyhow::Result<AuthenticationToken>> + Send>>
}) as AuthTokenFetcher
});
self.request_sender
.send(ClientRequest::Authenticate(fetcher))
.expect("INTERNAL BUG: Worker has gone away");
}
pub async fn set_auth_callback(&mut self, fetcher: Option<AuthTokenFetcher>) {
self.request_sender
.send(ClientRequest::Authenticate(fetcher))
.expect("INTERNAL BUG: Worker has gone away");
}
#[doc(hidden)]
pub async fn set_admin_auth(
&mut self,
deploy_key: String,
acting_as: Option<UserIdentityAttributes>,
) {
let fetcher: AuthTokenFetcher = Box::new(move |_force_refresh: bool| {
let deploy_key = deploy_key.clone();
let acting_as = acting_as.clone();
Box::pin(async move { Ok(AuthenticationToken::Admin(deploy_key, acting_as)) })
});
self.request_sender
.send(ClientRequest::Authenticate(Some(fetcher)))
.expect("INTERNAL BUG: Worker has gone away");
}
}
fn deployment_to_ws_url(mut deployment_url: Url) -> anyhow::Result<Url> {
let ws_scheme = match deployment_url.scheme() {
"http" | "ws" => "ws",
"https" | "wss" => "wss",
scheme => anyhow::bail!("Unknown scheme {scheme}. Expected http or https."),
};
deployment_url
.set_scheme(ws_scheme)
.expect("Scheme not supported");
deployment_url.set_path("api/sync");
Ok(deployment_url)
}
pub struct ConvexClientBuilder {
deployment_url: String,
client_id: Option<String>,
on_state_change: Option<mpsc::Sender<WebSocketState>>,
}
impl ConvexClientBuilder {
pub fn new(deployment_url: &str) -> Self {
Self {
deployment_url: deployment_url.to_string(),
client_id: None,
on_state_change: None,
}
}
pub fn with_client_id(mut self, client_id: &str) -> Self {
self.client_id = Some(client_id.to_string());
self
}
pub fn with_on_state_change(mut self, on_state_change: mpsc::Sender<WebSocketState>) -> Self {
self.on_state_change = Some(on_state_change);
self
}
pub async fn build(self) -> anyhow::Result<ConvexClient> {
ConvexClient::new_from_builder(self).await
}
}
#[cfg(test)]
pub mod tests {
use std::{
str::FromStr,
sync::Arc,
time::Duration,
};
use convex_sync_types::{
types::SerializedArgs,
AuthenticationToken,
ClientMessage,
LogLinesMessage,
Query,
QueryId,
QuerySetModification,
SessionId,
StateModification,
StateVersion,
UdfPath,
UserIdentityAttributes,
};
use futures::StreamExt;
use maplit::btreemap;
use pretty_assertions::assert_eq;
use serde_json::json;
use tokio::sync::{
broadcast,
mpsc,
};
use super::ConvexClient;
use crate::{
base_client::FunctionResult,
client::{
deployment_to_ws_url,
worker::worker,
BaseConvexClient,
},
sync::{
testing::TestProtocolManager,
ServerMessage,
SyncProtocol,
},
value::Value,
QuerySubscription,
};
impl ConvexClient {
pub async fn with_test_protocol() -> anyhow::Result<(Self, TestProtocolManager)> {
let _ = tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();
let (response_sender, response_receiver) = mpsc::channel(1);
let (request_sender, request_receiver) = mpsc::unbounded_channel();
let (watch_sender, watch_receiver) = broadcast::channel(1);
let test_protocol = TestProtocolManager::open(
"ws://test.com".parse()?,
response_sender,
None,
"rust-0.0.1",
)
.await?;
let base_client = BaseConvexClient::new();
let listen_handle = tokio::spawn(worker(
response_receiver,
request_receiver,
watch_sender,
base_client,
test_protocol.clone(),
));
let client = ConvexClient {
listen_handle: Some(Arc::new(listen_handle)),
request_sender,
watch_receiver,
};
Ok((client, test_protocol))
}
}
fn fake_mutation_response(result: FunctionResult) -> (ServerMessage, ServerMessage) {
let (transition_response, new_version) = fake_transition(StateVersion::initial(), vec![]);
let mutation_response = ServerMessage::MutationResponse {
request_id: 0,
result: result.into(),
ts: Some(new_version.ts),
log_lines: LogLinesMessage(vec![]),
};
(mutation_response, transition_response)
}
fn fake_action_response(result: FunctionResult) -> ServerMessage {
ServerMessage::ActionResponse {
request_id: 0,
result: result.into(),
log_lines: LogLinesMessage(vec![]),
}
}
fn fake_transition(
start_version: StateVersion,
modifications: Vec<(QueryId, Value)>,
) -> (ServerMessage, StateVersion) {
let end_version = StateVersion {
ts: start_version.ts.succ().expect("Succ failed"),
..start_version
};
(
ServerMessage::Transition {
start_version,
end_version,
modifications: modifications
.into_iter()
.map(|(query_id, value)| StateModification::QueryUpdated {
query_id,
value,
journal: None,
log_lines: LogLinesMessage(vec![]),
})
.collect(),
client_clock_skew: None,
server_ts: None,
},
end_version,
)
}
#[tokio::test]
async fn test_mutation() -> anyhow::Result<()> {
let (mut client, mut test_protocol) = ConvexClient::with_test_protocol().await?;
test_protocol.take_sent().await;
let mut res =
tokio::spawn(async move { client.mutation("incrementCounter", btreemap! {}).await });
test_protocol.wait_until_n_messages_sent(1).await;
assert_eq!(
test_protocol.take_sent().await,
vec![ClientMessage::Mutation {
request_id: 0,
udf_path: UdfPath::from_str("incrementCounter")?,
args: SerializedArgs::from_args(vec![json!({})])?,
component_path: None,
}]
);
let mutation_result = FunctionResult::Value(Value::Null);
let (mut_resp, transition) = fake_mutation_response(mutation_result.clone());
test_protocol.fake_server_response(mut_resp).await?;
tokio::time::timeout(Duration::from_millis(50), &mut res)
.await
.unwrap_err();
test_protocol.fake_server_response(transition).await?;
assert_eq!(res.await??, mutation_result);
Ok(())
}
#[tokio::test]
async fn test_mutation_error() -> anyhow::Result<()> {
let (mut client, mut test_protocol) = ConvexClient::with_test_protocol().await?;
test_protocol.take_sent().await;
let res =
tokio::spawn(async move { client.mutation("incrementCounter", btreemap! {}).await });
test_protocol.wait_until_n_messages_sent(1).await;
test_protocol.take_sent().await;
let mutation_result = FunctionResult::ErrorMessage("JEEPERS".into());
let (mut_resp, _transition) = fake_mutation_response(mutation_result.clone());
test_protocol.fake_server_response(mut_resp).await?;
assert_eq!(res.await??, mutation_result);
Ok(())
}
#[tokio::test]
async fn test_action() -> anyhow::Result<()> {
let (mut client, mut test_protocol) = ConvexClient::with_test_protocol().await?;
test_protocol.take_sent().await;
let action_result = FunctionResult::Value(Value::Null);
let server_message = fake_action_response(action_result.clone());
let res = tokio::spawn(async move { client.action("runAction:hello", btreemap! {}).await });
test_protocol.wait_until_n_messages_sent(1).await;
assert_eq!(
test_protocol.take_sent().await,
vec![ClientMessage::Action {
request_id: 0,
udf_path: UdfPath::from_str("runAction:hello")?,
args: SerializedArgs::from_args(vec![json!({})])?,
component_path: None,
}]
);
test_protocol.fake_server_response(server_message).await?;
assert_eq!(res.await??, action_result);
Ok(())
}
#[tokio::test]
async fn test_auth() -> anyhow::Result<()> {
let (mut client, test_protocol) = ConvexClient::with_test_protocol().await?;
test_protocol.take_sent().await;
client.set_auth(Some("myauthtoken".into())).await;
test_protocol.wait_until_n_messages_sent(1).await;
assert_eq!(
test_protocol.take_sent().await,
vec![ClientMessage::Authenticate {
base_version: 0,
token: AuthenticationToken::User("myauthtoken".into()),
}]
);
client.set_auth(None).await;
test_protocol.wait_until_n_messages_sent(1).await;
assert_eq!(
test_protocol.take_sent().await,
vec![ClientMessage::Authenticate {
base_version: 1,
token: AuthenticationToken::None,
}]
);
client.set_admin_auth("myadminauth".into(), None).await;
test_protocol.wait_until_n_messages_sent(1).await;
assert_eq!(
test_protocol.take_sent().await,
vec![ClientMessage::Authenticate {
base_version: 2,
token: AuthenticationToken::Admin("myadminauth".into(), None),
}]
);
let acting_as = UserIdentityAttributes {
name: Some("Barbara Liskov".into()),
..Default::default()
};
client
.set_admin_auth("myadminauth".into(), Some(acting_as.clone()))
.await;
test_protocol.wait_until_n_messages_sent(1).await;
assert_eq!(
test_protocol.take_sent().await,
vec![ClientMessage::Authenticate {
base_version: 3,
token: AuthenticationToken::Admin("myadminauth".into(), Some(acting_as)),
}]
);
Ok(())
}
#[tokio::test]
async fn test_auth_callback() -> anyhow::Result<()> {
let (mut client, test_protocol) = ConvexClient::with_test_protocol().await?;
test_protocol.take_sent().await;
let fetcher: crate::client::AuthTokenFetcher = Box::new(|_force_refresh| {
Box::pin(async { Ok(AuthenticationToken::User("callback_token".into())) })
});
client.set_auth_callback(Some(fetcher)).await;
test_protocol.wait_until_n_messages_sent(1).await;
assert_eq!(
test_protocol.take_sent().await,
vec![ClientMessage::Authenticate {
base_version: 0,
token: AuthenticationToken::User("callback_token".into()),
}]
);
client.set_auth_callback(None).await;
test_protocol.wait_until_n_messages_sent(1).await;
assert_eq!(
test_protocol.take_sent().await,
vec![ClientMessage::Authenticate {
base_version: 1,
token: AuthenticationToken::None,
}]
);
Ok(())
}
#[tokio::test]
async fn test_auth_callback_returning_none() -> anyhow::Result<()> {
let (mut client, test_protocol) = ConvexClient::with_test_protocol().await?;
test_protocol.take_sent().await;
let fetcher: crate::client::AuthTokenFetcher =
Box::new(|_force_refresh| Box::pin(async { Ok(AuthenticationToken::None) }));
client.set_auth_callback(Some(fetcher)).await;
test_protocol.wait_until_n_messages_sent(1).await;
assert_eq!(
test_protocol.take_sent().await,
vec![ClientMessage::Authenticate {
base_version: 0,
token: AuthenticationToken::None,
}]
);
Ok(())
}
#[tokio::test]
async fn test_set_auth_uses_callback_path() -> anyhow::Result<()> {
let (mut client, test_protocol) = ConvexClient::with_test_protocol().await?;
test_protocol.take_sent().await;
client.set_auth(Some("static_token".into())).await;
test_protocol.wait_until_n_messages_sent(1).await;
assert_eq!(
test_protocol.take_sent().await,
vec![ClientMessage::Authenticate {
base_version: 0,
token: AuthenticationToken::User("static_token".into()),
}]
);
client.set_auth(None).await;
test_protocol.wait_until_n_messages_sent(1).await;
assert_eq!(
test_protocol.take_sent().await,
vec![ClientMessage::Authenticate {
base_version: 1,
token: AuthenticationToken::None,
}]
);
Ok(())
}
#[tokio::test]
async fn test_client_single_subscription() -> anyhow::Result<()> {
let (mut client, mut test_protocol) = ConvexClient::with_test_protocol().await?;
let mut subscription1 = client.subscribe("getValue1", btreemap! {}).await?;
let query_id = subscription1.query_id();
assert_eq!(
test_protocol.take_sent().await,
vec![
ClientMessage::Connect {
session_id: SessionId::nil(),
connection_count: 0,
last_close_reason: "InitialConnect".to_string(),
max_observed_timestamp: None,
client_ts: None,
},
ClientMessage::ModifyQuerySet {
base_version: 0,
new_version: 1,
modifications: vec![QuerySetModification::Add(Query {
query_id,
udf_path: "getValue1".parse()?,
args: SerializedArgs::from_args(vec![json!({})])?,
journal: None,
component_path: None,
})]
},
]
);
test_protocol
.fake_server_response(
fake_transition(
StateVersion::initial(),
vec![(subscription1.query_id(), 10.into())],
)
.0,
)
.await?;
assert_eq!(
subscription1.next().await,
Some(FunctionResult::Value(10.into()))
);
assert_eq!(
client.query("getValue1", btreemap! {}).await?,
FunctionResult::Value(10.into())
);
drop(subscription1);
test_protocol.wait_until_n_messages_sent(1).await;
assert_eq!(
test_protocol.take_sent().await,
vec![ClientMessage::ModifyQuerySet {
base_version: 1,
new_version: 2,
modifications: vec![QuerySetModification::Remove { query_id }],
}]
);
Ok(())
}
#[tokio::test]
async fn test_client_subscribe_unsubscribe_subscribe() -> anyhow::Result<()> {
let (mut client, mut test_protocol) = ConvexClient::with_test_protocol().await?;
let subscription1b: QuerySubscription;
{
let _ignored = client.subscribe("getValue1", btreemap! {}).await?;
subscription1b = client.subscribe("getValue1", btreemap! {}).await?;
}
let subscription1c = client.subscribe("getValue1", btreemap! {}).await?;
test_protocol.take_sent().await;
let mut watch = client.watch_all();
test_protocol
.fake_server_response(
fake_transition(StateVersion::initial(), vec![(QueryId::new(0), 10.into())]).0,
)
.await?;
let results = watch.next().await.expect("Watch should have results");
assert_eq!(
results.get(&subscription1b),
Some(&FunctionResult::Value(10.into()))
);
assert_eq!(
results.get(&subscription1c),
Some(&FunctionResult::Value(10.into()))
);
Ok(())
}
#[tokio::test]
async fn test_client_consistent_view_watch() -> anyhow::Result<()> {
let (mut client, mut test_protocol) = ConvexClient::with_test_protocol().await?;
let subscription1 = client.subscribe("getValue1", btreemap! {}).await?;
let subscription2a = client.subscribe("getValue2", btreemap! {}).await?;
let subscription2b = client.subscribe("getValue2", btreemap! {}).await?;
let subscription3 = client.subscribe("getValue3", btreemap! {}).await?;
test_protocol.take_sent().await;
let mut watch = client.watch_all();
test_protocol
.fake_server_response(
fake_transition(
StateVersion::initial(),
vec![(QueryId::new(0), 10.into()), (QueryId::new(1), 20.into())],
)
.0,
)
.await?;
let results = watch.next().await.expect("Watch should have results");
assert_eq!(
results.get(&subscription1),
Some(&FunctionResult::Value(10.into()))
);
assert_eq!(
results.get(&subscription2a),
Some(&FunctionResult::Value(20.into()))
);
assert_eq!(
results.get(&subscription2b),
Some(&FunctionResult::Value(20.into()))
);
assert_eq!(results.get(&subscription3), None);
assert_eq!(
results.iter().collect::<Vec<_>>(),
vec![
(subscription1.id(), Some(&FunctionResult::Value(10.into()))),
(subscription2a.id(), Some(&FunctionResult::Value(20.into()))),
(subscription2b.id(), Some(&FunctionResult::Value(20.into()))),
(subscription3.id(), None,),
]
);
Ok(())
}
#[tokio::test]
async fn test_drop_client() -> anyhow::Result<()> {
let (mut client, _test_protocol) = ConvexClient::with_test_protocol().await?;
let mut subscription1 = client.subscribe("getValue1", btreemap! {}).await?;
drop(client);
tokio::task::yield_now().await;
assert!(subscription1.next().await.is_none());
drop(subscription1);
Ok(())
}
#[tokio::test]
async fn test_client_separate_queries() -> anyhow::Result<()> {
let (mut client, test_protocol) = ConvexClient::with_test_protocol().await?;
let subscription1 = client.subscribe("getValue1", btreemap! {}).await?;
let subscription2 = client.subscribe("getValue2", btreemap! {}).await?;
let subscription3 = client
.subscribe("getValue2", btreemap! {"hello".into() => "world".into()})
.await?;
assert_ne!(subscription1.query_id(), subscription2.query_id());
assert_ne!(subscription2.query_id(), subscription3.query_id());
assert_eq!(
test_protocol.take_sent().await,
vec![
ClientMessage::Connect {
session_id: SessionId::nil(),
connection_count: 0,
last_close_reason: "InitialConnect".to_string(),
max_observed_timestamp: None,
client_ts: None,
},
ClientMessage::ModifyQuerySet {
base_version: 0,
new_version: 1,
modifications: vec![QuerySetModification::Add(Query {
query_id: subscription1.query_id(),
udf_path: "getValue1".parse()?,
args: SerializedArgs::from_args(vec![json!({})])?,
journal: None,
component_path: None,
})]
},
ClientMessage::ModifyQuerySet {
base_version: 1,
new_version: 2,
modifications: vec![QuerySetModification::Add(Query {
query_id: subscription2.query_id(),
udf_path: "getValue2".parse()?,
args: SerializedArgs::from_args(vec![json!({})])?,
journal: None,
component_path: None,
})]
},
ClientMessage::ModifyQuerySet {
base_version: 2,
new_version: 3,
modifications: vec![QuerySetModification::Add(Query {
query_id: subscription3.query_id(),
udf_path: "getValue2".parse()?,
args: SerializedArgs::from_args(vec![json!({"hello": "world"})])?,
journal: None,
component_path: None,
})]
},
]
);
Ok(())
}
#[tokio::test]
async fn test_client_two_identical_queries() -> anyhow::Result<()> {
let (mut client, mut test_protocol) = ConvexClient::with_test_protocol().await?;
let mut subscription1 = client.subscribe("getValue", btreemap! {}).await?;
let mut subscription2 = client.subscribe("getValue", btreemap! {}).await?;
assert_ne!(subscription1.subscriber_id, subscription2.subscriber_id);
assert_eq!(subscription1.query_id(), subscription2.query_id());
let query_id = subscription1.query_id();
assert_eq!(
test_protocol.take_sent().await,
vec![
ClientMessage::Connect {
session_id: SessionId::nil(),
connection_count: 0,
last_close_reason: "InitialConnect".to_string(),
max_observed_timestamp: None,
client_ts: None,
},
ClientMessage::ModifyQuerySet {
base_version: 0,
new_version: 1,
modifications: vec![QuerySetModification::Add(Query {
query_id,
udf_path: "getValue".parse()?,
args: SerializedArgs::from_args(vec![json!({})])?,
journal: None,
component_path: None,
})]
},
]
);
let mut version = StateVersion::initial();
for i in 1..5 {
let (transition, new_version) = fake_transition(version, vec![(query_id, i.into())]);
test_protocol.fake_server_response(transition).await?;
version = new_version;
assert_eq!(
subscription1.next().await,
Some(FunctionResult::Value(i.into()))
);
assert_eq!(
subscription2.next().await,
Some(FunctionResult::Value(i.into()))
);
}
let mut subscription3 = client.subscribe("getValue", btreemap! {}).await?;
assert_eq!(
subscription3.next().await,
Some(FunctionResult::Value(4.into())),
);
drop(subscription1);
drop(subscription2);
let (transition, _new_version) = fake_transition(version, vec![(query_id, 5.into())]);
test_protocol.fake_server_response(transition).await?;
assert_eq!(
subscription3.next().await,
Some(FunctionResult::Value(5.into())),
);
Ok(())
}
#[test]
fn test_deployment_url() -> anyhow::Result<()> {
assert_eq!(
deployment_to_ws_url("http://flying-shark-123.convex.cloud".parse()?)?.to_string(),
"ws://flying-shark-123.convex.cloud/api/sync",
);
assert_eq!(
deployment_to_ws_url("https://flying-shark-123.convex.cloud".parse()?)?.to_string(),
"wss://flying-shark-123.convex.cloud/api/sync",
);
assert_eq!(
deployment_to_ws_url("ws://flying-shark-123.convex.cloud".parse()?)?.to_string(),
"ws://flying-shark-123.convex.cloud/api/sync",
);
assert_eq!(
deployment_to_ws_url("wss://flying-shark-123.convex.cloud".parse()?)?.to_string(),
"wss://flying-shark-123.convex.cloud/api/sync",
);
assert_eq!(
deployment_to_ws_url("ftp://flying-shark-123.convex.cloud".parse()?)
.unwrap_err()
.to_string(),
"Unknown scheme ftp. Expected http or https.",
);
Ok(())
}
}