pub mod cmd;
mod err;
mod proto;
mod tools;
pub mod types;
#[macro_use]
mod rjson_macros;
use async_net::TcpStream;
use cmd::args::{Args, ArgsWithOpt};
use cmd::run::Response;
use dashmap::DashMap;
use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
use futures::lock::Mutex;
use proto::Payload;
use ql2::query::QueryType;
use ql2::response::ResponseType;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::borrow::Cow;
use std::ops::Drop;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use tools::StaticString;
use tracing::trace;
pub use cmd::func::Func;
pub use err::*;
pub use proto::{Command, Datum};
pub use types::DateTime;
pub use unreql_macros::func;
#[doc(hidden)]
pub static VAR_COUNTER: AtomicU64 = AtomicU64::new(1);
#[doc(hidden)]
pub fn var_counter() -> u64 {
VAR_COUNTER.fetch_add(1, Ordering::SeqCst)
}
#[allow(dead_code)]
#[cfg(test)]
fn current_counter() -> u64 {
VAR_COUNTER.load(Ordering::SeqCst)
}
pub type Result<T> = std::result::Result<T, Error>;
type Sender = UnboundedSender<Result<(ResponseType, Response)>>;
type Receiver = UnboundedReceiver<Result<(ResponseType, Response)>>;
#[derive(Debug)]
struct InnerSession {
db: Mutex<Cow<'static, str>>,
stream: Mutex<TcpStream>,
channels: DashMap<u64, Sender>,
token: AtomicU64,
broken: AtomicBool,
change_feed: AtomicBool,
}
impl InnerSession {
fn token(&self) -> u64 {
let token = self
.token
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| Some(x + 1))
.unwrap();
if token == u64::MAX {
self.mark_broken();
}
token
}
fn mark_broken(&self) {
self.broken.store(true, Ordering::SeqCst);
}
fn broken(&self) -> Result<()> {
if self.broken.load(Ordering::SeqCst) {
return Err(err::Driver::ConnectionBroken.into());
}
Ok(())
}
fn mark_change_feed(&self) {
self.change_feed.store(true, Ordering::SeqCst);
}
fn unmark_change_feed(&self) {
self.change_feed.store(false, Ordering::SeqCst);
}
fn is_change_feed(&self) -> bool {
self.change_feed.load(Ordering::SeqCst)
}
fn change_feed(&self) -> Result<()> {
if self.change_feed.load(Ordering::SeqCst) {
return Err(err::Driver::ConnectionLocked.into());
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct Session {
inner: Arc<InnerSession>,
}
impl Session {
pub fn connection(&self) -> Result<Connection> {
self.inner.broken()?;
self.inner.change_feed()?;
let token = self.inner.token();
let (tx, rx) = mpsc::unbounded();
self.inner.channels.insert(token, tx);
Ok(Connection::new(self.clone(), rx, token))
}
pub async fn use_<T>(&mut self, db_name: T)
where
T: StaticString,
{
*self.inner.db.lock().await = db_name.static_string();
}
pub async fn noreply_wait(&self) -> Result<()> {
let mut conn = self.connection()?;
let payload = Payload(QueryType::NoreplyWait, None, Default::default());
trace!(
"waiting for noreply operations to finish; token: {}",
conn.token
);
let (typ, _) = conn.request(&payload, false).await?;
trace!(
"session.noreply_wait() run; token: {}, response type: {:?}",
conn.token,
typ,
);
Ok(())
}
pub async fn server(&self) -> Result<ServerInfo> {
let mut conn = self.connection()?;
let payload = Payload(QueryType::ServerInfo, None, Default::default());
trace!("retrieving server information; token: {}", conn.token);
let (typ, resp) = conn.request(&payload, false).await?;
trace!(
"session.server() run; token: {}, response type: {:?}",
conn.token,
typ,
);
let mut vec = serde_json::from_value::<Vec<ServerInfo>>(resp.r)?;
let info = vec
.pop()
.ok_or_else(|| Driver::Other("server info is empty".into()))?;
Ok(info)
}
#[doc(hidden)]
pub fn is_broken(&self) -> bool {
self.inner.broken.load(Ordering::SeqCst)
}
}
#[derive(Debug, Clone, Deserialize, Serialize, Eq, PartialEq, Ord, PartialOrd, Hash)]
#[non_exhaustive]
pub struct ServerInfo {
pub id: String,
pub proxy: bool,
pub name: Option<String>,
}
#[derive(Debug, Clone)]
pub struct Connection {
session: Session,
rx: Arc<Mutex<Receiver>>,
token: u64,
closed: Arc<AtomicBool>,
}
impl Connection {
fn new(session: Session, rx: Receiver, token: u64) -> Connection {
Connection {
session,
token,
rx: Arc::new(Mutex::new(rx)),
closed: Arc::new(AtomicBool::new(false)),
}
}
pub async fn close<T>(&mut self, arg: T) -> Result<()>
where
T: cmd::close::Arg,
{
if !self.session.inner.is_change_feed() {
trace!(
"ignoring conn.close() called on a normal connection; token: {}",
self.token
);
return Ok(());
}
self.set_closed(true);
let arg = if arg.noreply_wait() {
None
} else {
Some(r.expr(json!({ "noreply": false })))
};
let payload = Payload(QueryType::Stop, arg.as_ref(), Default::default());
trace!("closing a changefeed; token: {}", self.token);
let (typ, _) = self.request(&payload, false).await?;
self.session.inner.unmark_change_feed();
trace!(
"conn.close() run; token: {}, response type: {:?}",
self.token,
typ,
);
Ok(())
}
fn closed(&self) -> bool {
self.closed.load(Ordering::SeqCst)
}
fn set_closed(&self, closed: bool) {
self.closed.store(closed, Ordering::SeqCst);
}
}
impl Drop for Connection {
fn drop(&mut self) {
self.session.inner.channels.remove(&self.token);
if self.session.inner.is_change_feed() {
self.session.inner.unmark_change_feed();
}
}
}
#[allow(non_camel_case_types)]
#[derive(Clone, Copy, Debug)]
pub struct r;
impl r {
pub async fn connect<T>(self, options: T) -> Result<Session>
where
T: cmd::connect::Arg,
{
cmd::connect::new(options.into_connect_opts()).await
}
pub fn expr(self, arg: impl Serialize) -> Command {
Command::from_json(arg)
}
pub fn args<T>(self, arg: T) -> Args<T> {
Args(arg)
}
pub fn with_opt<T, P>(self, arg: T, opt: P) -> ArgsWithOpt<T, P> {
ArgsWithOpt(arg, opt)
}
}
#[doc(hidden)]
pub fn example<'a, Q, R>(_query: Q)
where
Q: FnOnce(r, &'a mut Session) -> R,
R: futures::Stream<Item = Result<()>>,
{
}