#![cfg(feature = "kv-mem")]
use crate::err::Error;
use crate::key::debug::Sprintable;
use crate::kvs::{Key, Val, Version};
use std::fmt::Debug;
use std::ops::Range;
use surrealkv::Options;
use surrealkv::Store;
use surrealkv::Transaction as Tx;
use super::{Check, KeyEncode};
pub struct Datastore {
	db: Store,
}
pub struct Transaction {
	done: bool,
	write: bool,
	check: Check,
	inner: Tx,
}
impl Drop for Transaction {
	fn drop(&mut self) {
		if !self.done && self.write {
			match self.check {
				Check::None => {
					trace!("A transaction was dropped without being committed or cancelled");
				}
				Check::Warn => {
					warn!("A transaction was dropped without being committed or cancelled");
				}
				Check::Error => {
					error!("A transaction was dropped without being committed or cancelled");
				}
			}
		}
	}
}
impl Datastore {
	pub(crate) async fn new() -> Result<Datastore, Error> {
		let mut opts = Options::new();
		opts.enable_versions = false;
		opts.disk_persistence = false;
		match Store::new(opts) {
			Ok(db) => Ok(Datastore {
				db,
			}),
			Err(e) => Err(Error::Ds(e.to_string())),
		}
	}
	pub(crate) async fn shutdown(&self) -> Result<(), Error> {
		Ok(())
	}
	pub(crate) async fn transaction(&self, write: bool, _: bool) -> Result<Transaction, Error> {
		#[cfg(not(debug_assertions))]
		let check = Check::Warn;
		#[cfg(debug_assertions)]
		let check = Check::Error;
		match self.db.begin() {
			Ok(inner) => Ok(Transaction {
				done: false,
				check,
				write,
				inner,
			}),
			Err(e) => Err(Error::Tx(e.to_string())),
		}
	}
}
impl super::api::Transaction for Transaction {
	fn check_level(&mut self, check: Check) {
		self.check = check;
	}
	fn closed(&self) -> bool {
		self.done
	}
	fn writeable(&self) -> bool {
		self.write
	}
	#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self))]
	async fn cancel(&mut self) -> Result<(), Error> {
		if self.done {
			return Err(Error::TxFinished);
		}
		self.done = true;
		self.inner.rollback();
		Ok(())
	}
	#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self))]
	async fn commit(&mut self) -> Result<(), Error> {
		if self.done {
			return Err(Error::TxFinished);
		}
		if !self.write {
			return Err(Error::TxReadonly);
		}
		self.done = true;
		self.inner.commit().await?;
		Ok(())
	}
	#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))]
	async fn exists<K>(&mut self, key: K, version: Option<u64>) -> Result<bool, Error>
	where
		K: KeyEncode + Sprintable + Debug,
	{
		if version.is_some() {
			return Err(Error::UnsupportedVersionedQueries);
		}
		if self.done {
			return Err(Error::TxFinished);
		}
		let res = self.inner.get(&key.encode()?)?.is_some();
		Ok(res)
	}
	#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))]
	async fn get<K>(&mut self, key: K, version: Option<u64>) -> Result<Option<Val>, Error>
	where
		K: KeyEncode + Sprintable + Debug,
	{
		if version.is_some() {
			return Err(Error::UnsupportedVersionedQueries);
		}
		if self.done {
			return Err(Error::TxFinished);
		}
		let res = match version {
			Some(ts) => self.inner.get_at_version(&key.encode()?, ts)?,
			None => self.inner.get(&key.encode()?)?,
		};
		Ok(res)
	}
	#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))]
	async fn set<K, V>(&mut self, key: K, val: V, version: Option<u64>) -> Result<(), Error>
	where
		K: KeyEncode + Sprintable + Debug,
		V: Into<Val> + Debug,
	{
		if version.is_some() {
			return Err(Error::UnsupportedVersionedQueries);
		}
		if self.done {
			return Err(Error::TxFinished);
		}
		if !self.write {
			return Err(Error::TxReadonly);
		}
		match version {
			Some(ts) => self.inner.set_at_ts(&key.encode()?, &val.into(), ts)?,
			None => self.inner.set(&key.encode()?, &val.into())?,
		}
		Ok(())
	}
	#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))]
	async fn put<K, V>(&mut self, key: K, val: V, version: Option<u64>) -> Result<(), Error>
	where
		K: KeyEncode + Sprintable + Debug,
		V: Into<Val> + Debug,
	{
		if version.is_some() {
			return Err(Error::UnsupportedVersionedQueries);
		}
		if self.done {
			return Err(Error::TxFinished);
		}
		if !self.write {
			return Err(Error::TxReadonly);
		}
		let key = key.encode_owned()?;
		let val = val.into();
		if let Some(ts) = version {
			self.inner.set_at_ts(&key, &val, ts)?;
		} else {
			match self.inner.get(&key)? {
				None => self.inner.set(&key, &val)?,
				_ => return Err(Error::TxKeyAlreadyExists),
			}
		}
		Ok(())
	}
	#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))]
	async fn putc<K, V>(&mut self, key: K, val: V, chk: Option<V>) -> Result<(), Error>
	where
		K: KeyEncode + Sprintable + Debug,
		V: Into<Val> + Debug,
	{
		if self.done {
			return Err(Error::TxFinished);
		}
		if !self.write {
			return Err(Error::TxReadonly);
		}
		let key = key.encode_owned()?;
		let val = val.into();
		let chk = chk.map(Into::into);
		match (self.inner.get(&key)?, chk) {
			(Some(v), Some(w)) if v == w => self.inner.set(&key, &val)?,
			(None, None) => self.inner.set(&key, &val)?,
			_ => return Err(Error::TxConditionNotMet),
		};
		Ok(())
	}
	#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))]
	async fn del<K>(&mut self, key: K) -> Result<(), Error>
	where
		K: KeyEncode + Sprintable + Debug,
	{
		if self.done {
			return Err(Error::TxFinished);
		}
		if !self.write {
			return Err(Error::TxReadonly);
		}
		self.inner.delete(&key.encode()?)?;
		Ok(())
	}
	#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))]
	async fn delc<K, V>(&mut self, key: K, chk: Option<V>) -> Result<(), Error>
	where
		K: KeyEncode + Sprintable + Debug,
		V: Into<Val> + Debug,
	{
		if self.done {
			return Err(Error::TxFinished);
		}
		if !self.write {
			return Err(Error::TxReadonly);
		}
		let key = key.encode()?;
		let chk = chk.map(Into::into);
		match (self.inner.get(&key)?, chk) {
			(Some(v), Some(w)) if v == w => self.inner.delete(&key)?,
			(None, None) => self.inner.delete(&key)?,
			_ => return Err(Error::TxConditionNotMet),
		};
		Ok(())
	}
	#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(rng = rng.sprint()))]
	async fn keys<K>(
		&mut self,
		rng: Range<K>,
		limit: u32,
		version: Option<u64>,
	) -> Result<Vec<Key>, Error>
	where
		K: KeyEncode + Sprintable + Debug,
	{
		if version.is_some() {
			return Err(Error::UnsupportedVersionedQueries);
		}
		if self.done {
			return Err(Error::TxFinished);
		}
		let beg = rng.start.encode()?;
		let end = rng.end.encode()?;
		let res = self.inner.keys(beg.as_slice()..end.as_slice(), Some(limit as usize));
		let res = res.map(Key::from).collect();
		Ok(res)
	}
	#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(rng = rng.sprint()))]
	async fn scan<K>(
		&mut self,
		rng: Range<K>,
		limit: u32,
		version: Option<u64>,
	) -> Result<Vec<(Key, Val)>, Error>
	where
		K: KeyEncode + Sprintable + Debug,
	{
		if version.is_some() {
			return Err(Error::UnsupportedVersionedQueries);
		}
		if self.done {
			return Err(Error::TxFinished);
		}
		let beg = rng.start.encode()?;
		let end = rng.end.encode()?;
		let range = beg.as_slice()..end.as_slice();
		if let Some(ts) = version {
			self.inner
				.scan_at_version(range, ts, Some(limit as usize))
				.map(|r| r.map(|(k, v)| (k.to_vec(), v)).map_err(Into::into))
				.collect()
		} else {
			self.inner
				.scan(range, Some(limit as usize))
				.map(|r| r.map(|(k, v, _)| (k.to_vec(), v)).map_err(Into::into))
				.collect()
		}
	}
	#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(rng = rng.sprint()))]
	async fn scan_all_versions<K>(
		&mut self,
		rng: Range<K>,
		limit: u32,
	) -> Result<Vec<(Key, Val, Version, bool)>, Error>
	where
		K: KeyEncode + Sprintable + Debug,
	{
		if self.done {
			return Err(Error::TxFinished);
		}
		let beg = rng.start.encode()?;
		let end = rng.end.encode()?;
		let range = beg.as_slice()..end.as_slice();
		self.inner
			.scan_all_versions(range, Some(limit as usize))
			.map(|r| r.map(|(k, v, ts, del)| (k.to_vec(), v, ts, del)).map_err(Into::into))
			.collect()
	}
}
impl Transaction {
	pub(crate) fn new_save_point(&mut self) {
		let _ = self.inner.set_savepoint();
	}
	pub(crate) async fn rollback_to_save_point(&mut self) -> Result<(), Error> {
		self.inner.rollback_to_savepoint()?;
		Ok(())
	}
	pub(crate) fn release_last_save_point(&mut self) -> Result<(), Error> {
		Ok(())
	}
}