use std::borrow::Cow;
use std::future::IntoFuture;
use std::marker::PhantomData;
use std::path::PathBuf;
use std::pin::Pin;
use std::task::{Context, Poll};
use async_channel::Receiver;
use futures::{Stream, StreamExt};
use semver::Version;
use surrealdb_core::kvs::export::{Config as DbExportConfig, TableConfig};
use crate::conn::{Command, MlExportConfig};
use crate::method::{BoxFuture, ExportConfig as Config, Model, OnceLockExt};
use crate::{Connection, Error, ExtraFeatures, Result, Surreal};
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Export<'r, C: Connection, R, T = ()> {
pub(super) client: Cow<'r, Surreal<C>>,
pub(super) target: R,
pub(super) ml_config: Option<MlExportConfig>,
pub(super) db_config: Option<DbExportConfig>,
pub(super) response: PhantomData<R>,
pub(super) export_type: PhantomData<T>,
}
impl<'r, C, R> Export<'r, C, R>
where
C: Connection,
{
pub fn ml(self, name: &str, version: Version) -> Export<'r, C, R, Model> {
Export {
client: self.client,
target: self.target,
ml_config: Some(MlExportConfig {
name: name.to_owned(),
version: version.to_string(),
}),
db_config: self.db_config,
response: self.response,
export_type: PhantomData,
}
}
pub fn with_config(self) -> Export<'r, C, R, Config> {
Export {
client: self.client,
target: self.target,
ml_config: self.ml_config,
db_config: Some(Default::default()),
response: self.response,
export_type: PhantomData,
}
}
}
impl<C, R> Export<'_, C, R, Config>
where
C: Connection,
{
pub fn users(mut self, users: bool) -> Self {
if let Some(cfg) = self.db_config.as_mut() {
cfg.users = users;
}
self
}
pub fn accesses(mut self, accesses: bool) -> Self {
if let Some(cfg) = self.db_config.as_mut() {
cfg.accesses = accesses;
}
self
}
pub fn params(mut self, params: bool) -> Self {
if let Some(cfg) = self.db_config.as_mut() {
cfg.params = params;
}
self
}
pub fn functions(mut self, functions: bool) -> Self {
if let Some(cfg) = self.db_config.as_mut() {
cfg.functions = functions;
}
self
}
pub fn analyzers(mut self, analyzers: bool) -> Self {
if let Some(cfg) = self.db_config.as_mut() {
cfg.analyzers = analyzers;
}
self
}
pub fn versions(mut self, versions: bool) -> Self {
if let Some(cfg) = self.db_config.as_mut() {
cfg.versions = versions;
}
self
}
pub fn tables(mut self, tables: impl Into<TableConfig>) -> Self {
if let Some(cfg) = self.db_config.as_mut() {
cfg.tables = tables.into();
}
self
}
pub fn records(mut self, records: bool) -> Self {
if let Some(cfg) = self.db_config.as_mut() {
cfg.records = records;
}
self
}
pub fn apis(mut self, apis: bool) -> Self {
if let Some(cfg) = self.db_config.as_mut() {
cfg.apis = apis;
}
self
}
pub fn buckets(mut self, buckets: bool) -> Self {
if let Some(cfg) = self.db_config.as_mut() {
cfg.buckets = buckets;
}
self
}
pub fn modules(mut self, modules: bool) -> Self {
if let Some(cfg) = self.db_config.as_mut() {
cfg.modules = modules;
}
self
}
pub fn configs(mut self, configs: bool) -> Self {
if let Some(cfg) = self.db_config.as_mut() {
cfg.configs = configs;
}
self
}
}
impl<C, R, T> Export<'_, C, R, T>
where
C: Connection,
{
pub fn into_owned(self) -> Export<'static, C, R, T> {
Export {
client: Cow::Owned(self.client.into_owned()),
..self
}
}
}
impl<'r, Client, T> IntoFuture for Export<'r, Client, PathBuf, T>
where
Client: Connection,
{
type Output = Result<()>;
type IntoFuture = BoxFuture<'r, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
let router = self.client.inner.router.extract()?;
if !router.features.contains(&ExtraFeatures::Backup) {
return Err(Error::internal(
"The protocol or storage engine does not support backups on this architecture"
.to_string(),
));
}
if let Some(config) = self.ml_config {
return router
.execute_unit(
self.client.session_id,
Command::ExportMl {
path: self.target,
config,
},
)
.await;
}
router
.execute_unit(
self.client.session_id,
Command::ExportFile {
path: self.target,
config: self.db_config,
},
)
.await
})
}
}
impl<'r, Client, T> IntoFuture for Export<'r, Client, (), T>
where
Client: Connection,
{
type Output = Result<Backup>;
type IntoFuture = BoxFuture<'r, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
let router = self.client.inner.router.extract()?;
if !router.features.contains(&ExtraFeatures::Backup) {
tracing::warn!("Backups are not supported");
return Err(Error::internal(
"The protocol or storage engine does not support backups on this architecture"
.to_string(),
));
}
let (tx, rx) = crate::channel::bounded(1);
let rx = Box::pin(rx);
tracing::info!("Exporting bytes");
if let Some(config) = self.ml_config {
router
.execute_unit(
self.client.session_id,
Command::ExportBytesMl {
bytes: tx,
config,
},
)
.await?;
return Ok(Backup {
rx,
});
}
router
.execute_unit(
self.client.session_id,
Command::ExportBytes {
bytes: tx,
config: self.db_config,
},
)
.await?;
Ok(Backup {
rx,
})
})
}
}
#[derive(Debug, Clone)]
#[must_use = "streams do nothing unless you poll them"]
pub struct Backup {
rx: Pin<Box<Receiver<Result<Vec<u8>>>>>,
}
impl Stream for Backup {
type Item = Result<Vec<u8>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.as_mut().rx.poll_next_unpin(cx)
}
}