surrealdb/api/engine/any/
native.rs

1use crate::api::conn::Connection;
2use crate::api::conn::Router;
3#[allow(unused_imports)] // used by the DB engines
4use crate::api::engine;
5use crate::api::engine::any::Any;
6#[cfg(feature = "protocol-http")]
7use crate::api::engine::remote::http;
8use crate::api::err::Error;
9use crate::api::method::BoxFuture;
10#[cfg(any(feature = "native-tls", feature = "rustls"))]
11#[cfg(feature = "protocol-http")]
12use crate::api::opt::Tls;
13use crate::api::opt::{Endpoint, EndpointKind};
14#[allow(unused_imports)] // used by the DB engines
15use crate::api::ExtraFeatures;
16use crate::api::OnceLockExt;
17use crate::api::Result;
18use crate::api::Surreal;
19#[allow(unused_imports)]
20use crate::error::Db as DbError;
21use crate::opt::WaitFor;
22#[cfg(feature = "protocol-http")]
23use reqwest::ClientBuilder;
24use std::collections::HashSet;
25use std::sync::atomic::AtomicI64;
26use std::sync::Arc;
27use std::sync::OnceLock;
28use tokio::sync::watch;
29#[cfg(feature = "protocol-ws")]
30use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
31#[cfg(feature = "protocol-ws")]
32#[cfg(any(feature = "native-tls", feature = "rustls"))]
33use tokio_tungstenite::Connector;
34
35impl crate::api::Connection for Any {}
36
37impl Connection for Any {
38	#[allow(unused_variables, unreachable_code, unused_mut)] // these are all used depending on feature
39	fn connect(address: Endpoint, capacity: usize) -> BoxFuture<'static, Result<Surreal<Self>>> {
40		Box::pin(async move {
41			let (route_tx, route_rx) = match capacity {
42				0 => channel::unbounded(),
43				capacity => channel::bounded(capacity),
44			};
45
46			let (conn_tx, conn_rx) = channel::bounded::<Result<()>>(1);
47			let mut features = HashSet::new();
48
49			match EndpointKind::from(address.url.scheme()) {
50				EndpointKind::FoundationDb => {
51					#[cfg(feature = "kv-fdb")]
52					{
53						features.insert(ExtraFeatures::Backup);
54						features.insert(ExtraFeatures::LiveQueries);
55						tokio::spawn(engine::local::native::run_router(address, conn_tx, route_rx));
56						conn_rx.recv().await??
57					}
58
59					#[cfg(not(feature = "kv-fdb"))]
60					return Err(
61						DbError::Ds("Cannot connect to the `foundationdb` storage engine as it is not enabled in this build of SurrealDB".to_owned()).into()
62					);
63				}
64
65				EndpointKind::Memory => {
66					#[cfg(feature = "kv-mem")]
67					{
68						features.insert(ExtraFeatures::Backup);
69						features.insert(ExtraFeatures::LiveQueries);
70						tokio::spawn(engine::local::native::run_router(address, conn_tx, route_rx));
71						conn_rx.recv().await??
72					}
73
74					#[cfg(not(feature = "kv-mem"))]
75					return Err(
76						DbError::Ds("Cannot connect to the `memory` storage engine as it is not enabled in this build of SurrealDB".to_owned()).into()
77					);
78				}
79
80				EndpointKind::File | EndpointKind::RocksDb => {
81					#[cfg(feature = "kv-rocksdb")]
82					{
83						features.insert(ExtraFeatures::Backup);
84						features.insert(ExtraFeatures::LiveQueries);
85						tokio::spawn(engine::local::native::run_router(address, conn_tx, route_rx));
86						conn_rx.recv().await??
87					}
88
89					#[cfg(not(feature = "kv-rocksdb"))]
90					return Err(DbError::Ds(
91						"Cannot connect to the `rocksdb` storage engine as it is not enabled in this build of SurrealDB".to_owned(),
92					)
93					.into());
94				}
95
96				EndpointKind::TiKv => {
97					#[cfg(feature = "kv-tikv")]
98					{
99						features.insert(ExtraFeatures::Backup);
100						features.insert(ExtraFeatures::LiveQueries);
101						tokio::spawn(engine::local::native::run_router(address, conn_tx, route_rx));
102						conn_rx.recv().await??
103					}
104
105					#[cfg(not(feature = "kv-tikv"))]
106					return Err(
107						DbError::Ds("Cannot connect to the `tikv` storage engine as it is not enabled in this build of SurrealDB".to_owned()).into()
108					);
109				}
110
111				EndpointKind::SurrealKV => {
112					#[cfg(feature = "kv-surrealkv")]
113					{
114						features.insert(ExtraFeatures::Backup);
115						features.insert(ExtraFeatures::LiveQueries);
116						tokio::spawn(engine::local::native::run_router(address, conn_tx, route_rx));
117						conn_rx.recv().await??
118					}
119
120					#[cfg(not(feature = "kv-surrealkv"))]
121					return Err(DbError::Ds(
122						"Cannot connect to the `surrealkv` storage engine as it is not enabled in this build of SurrealDB".to_owned(),
123					)
124					.into());
125				}
126
127				EndpointKind::Http | EndpointKind::Https => {
128					#[cfg(feature = "protocol-http")]
129					{
130						features.insert(ExtraFeatures::Backup);
131						let headers = http::default_headers();
132						#[allow(unused_mut)]
133						let mut builder = ClientBuilder::new().default_headers(headers);
134						#[cfg(any(feature = "native-tls", feature = "rustls"))]
135						if let Some(tls) = address.config.tls_config {
136							builder = match tls {
137								#[cfg(feature = "native-tls")]
138								Tls::Native(config) => builder.use_preconfigured_tls(config),
139								#[cfg(feature = "rustls")]
140								Tls::Rust(config) => builder.use_preconfigured_tls(config),
141							};
142						}
143						let client = builder.build()?;
144						let base_url = address.url;
145						engine::remote::http::health(client.get(base_url.join("health")?)).await?;
146						tokio::spawn(engine::remote::http::native::run_router(
147							base_url, client, route_rx,
148						));
149					}
150
151					#[cfg(not(feature = "protocol-http"))]
152					return Err(DbError::Ds(
153						"Cannot connect to the `HTTP` remote engine as it is not enabled in this build of SurrealDB".to_owned(),
154					)
155					.into());
156				}
157
158				EndpointKind::Ws | EndpointKind::Wss => {
159					#[cfg(feature = "protocol-ws")]
160					{
161						features.insert(ExtraFeatures::LiveQueries);
162						let mut endpoint = address;
163						endpoint.url = endpoint.url.join(engine::remote::ws::PATH)?;
164						#[cfg(any(feature = "native-tls", feature = "rustls"))]
165						let maybe_connector = endpoint.config.tls_config.clone().map(Connector::from);
166						#[cfg(not(any(feature = "native-tls", feature = "rustls")))]
167						let maybe_connector = None;
168
169						let config = WebSocketConfig {
170							max_message_size: Some(engine::remote::ws::native::MAX_MESSAGE_SIZE),
171							max_frame_size: Some(engine::remote::ws::native::MAX_FRAME_SIZE),
172							max_write_buffer_size: engine::remote::ws::native::MAX_MESSAGE_SIZE,
173							..Default::default()
174						};
175						let socket = engine::remote::ws::native::connect(
176							&endpoint,
177							Some(config),
178							maybe_connector.clone(),
179						)
180						.await?;
181						tokio::spawn(engine::remote::ws::native::run_router(
182							endpoint,
183							maybe_connector,
184							capacity,
185							config,
186							socket,
187							route_rx,
188						));
189					}
190
191					#[cfg(not(feature = "protocol-ws"))]
192					return Err(DbError::Ds(
193						"Cannot connect to the `WebSocket` remote engine as it is not enabled in this build of SurrealDB".to_owned(),
194					)
195					.into());
196				}
197				EndpointKind::Unsupported(v) => return Err(Error::Scheme(v).into()),
198			}
199
200			Ok(Surreal::new_from_router_waiter(
201				Arc::new(OnceLock::with_value(Router {
202					features,
203					sender: route_tx,
204					last_id: AtomicI64::new(0),
205				})),
206				Arc::new(watch::channel(Some(WaitFor::Connection))),
207			))
208		})
209	}
210}