surrealdb 2.1.6

A scalable, distributed, collaborative, document-graph database, for the realtime web
Documentation
use super::Client;
use crate::api::conn::Connection;
use crate::api::conn::Route;
use crate::api::conn::Router;
use crate::api::method::BoxFuture;
use crate::api::opt::Endpoint;
use crate::api::OnceLockExt;
use crate::api::Result;
use crate::api::Surreal;
use crate::opt::WaitFor;
use channel::{Receiver, Sender};
use indexmap::IndexMap;
use reqwest::header::HeaderMap;
use reqwest::ClientBuilder;
use std::collections::HashSet;
use std::sync::atomic::AtomicI64;
use std::sync::Arc;
use std::sync::OnceLock;
use tokio::sync::watch;
use url::Url;
use wasm_bindgen_futures::spawn_local;

impl crate::api::Connection for Client {}

impl Connection for Client {
	fn connect(address: Endpoint, capacity: usize) -> BoxFuture<'static, Result<Surreal<Self>>> {
		Box::pin(async move {
			let (route_tx, route_rx) = match capacity {
				0 => channel::unbounded(),
				capacity => channel::bounded(capacity),
			};

			let (conn_tx, conn_rx) = channel::bounded(1);

			spawn_local(run_router(address, conn_tx, route_rx));

			conn_rx.recv().await??;

			Ok(Surreal::new_from_router_waiter(
				Arc::new(OnceLock::with_value(Router {
					features: HashSet::new(),
					sender: route_tx,
					last_id: AtomicI64::new(0),
				})),
				Arc::new(watch::channel(Some(WaitFor::Connection))),
			))
		})
	}
}

async fn client(base_url: &Url) -> Result<reqwest::Client> {
	let headers = super::default_headers();
	let builder = ClientBuilder::new().default_headers(headers);
	let client = builder.build()?;
	let health = base_url.join("health")?;
	super::health(client.get(health)).await?;
	Ok(client)
}

pub(crate) async fn run_router(
	address: Endpoint,
	conn_tx: Sender<Result<()>>,
	route_rx: Receiver<Route>,
) {
	let base_url = address.url;

	let client = match client(&base_url).await {
		Ok(client) => {
			let _ = conn_tx.send(Ok(())).await;
			client
		}
		Err(error) => {
			let _ = conn_tx.send(Err(error)).await;
			return;
		}
	};

	let mut headers = HeaderMap::new();
	let mut vars = IndexMap::new();
	let mut auth = None;

	while let Ok(route) = route_rx.recv().await {
		match super::router(route.request, &base_url, &client, &mut headers, &mut vars, &mut auth)
			.await
		{
			Ok(value) => {
				let _ = route.response.send(Ok(value)).await;
			}
			Err(error) => {
				let _ = route.response.send(Err(error)).await;
			}
		}
	}
}