use std::fmt;
use std::sync::Arc;
use std::sync::atomic::AtomicU32;
use crate::config::DaemonServer;
use crate::error::Error;
use crate::protocol::{IdAllocator, Request, Response};
use crate::transport::{self, ConnectedDispatcher, Dispatcher, DispatcherHandle};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TraceLevel {
Off,
Errors,
Datastream,
All,
}
impl TraceLevel {
fn as_str(self) -> &'static str {
match self {
TraceLevel::Off => "OFF",
TraceLevel::Errors => "ERRORS",
TraceLevel::Datastream => "DATASTREAM",
TraceLevel::All => "ALL",
}
}
}
pub(crate) struct JobInner {
pub(crate) handle: DispatcherHandle,
pub(crate) ids: Arc<IdAllocator>,
pub(crate) version: String,
pub(crate) initial_job: String,
pub(crate) in_flight: Arc<AtomicU32>,
}
pub struct Job {
pub(crate) inner: Arc<JobInner>,
_dispatcher: Dispatcher,
}
impl fmt::Debug for Job {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Job")
.field("version", &self.inner.version)
.field("initial_job", &self.inner.initial_job)
.finish_non_exhaustive()
}
}
impl Job {
pub async fn connect(server: &DaemonServer) -> crate::Result<Self> {
let ConnectedDispatcher {
dispatcher,
version,
initial_job,
ids,
in_flight,
} = transport::connect(server).await?;
let handle = dispatcher.handle();
Ok(Self {
inner: Arc::new(JobInner {
handle,
ids: Arc::new(ids),
version,
initial_job,
in_flight,
}),
_dispatcher: dispatcher,
})
}
#[must_use]
pub fn version(&self) -> &str {
&self.inner.version
}
#[must_use]
pub fn initial_job(&self) -> &str {
&self.inner.initial_job
}
pub(crate) async fn send(&self, request: Request) -> crate::Result<Response> {
self.inner.handle.send(request).await
}
#[must_use]
pub fn ids(&self) -> &IdAllocator {
&self.inner.ids
}
#[allow(dead_code)]
pub(crate) fn handle(&self) -> DispatcherHandle {
self.inner.handle.clone()
}
#[must_use]
pub fn in_flight(&self) -> u32 {
self.inner
.in_flight
.load(std::sync::atomic::Ordering::Relaxed)
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(skip(self), fields(job_id = %self.inner.initial_job, sql = %sql))
)]
pub async fn execute(&self, sql: &str) -> crate::Result<crate::query::Rows> {
#[cfg(feature = "metrics")]
let start = std::time::Instant::now();
let result = self.execute_inner(sql, None).await;
#[cfg(feature = "metrics")]
record_execute_latency(start);
result
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(
skip(self, params),
fields(
job_id = %self.inner.initial_job,
sql = %sql,
param_count = params.len(),
)
)
)]
pub async fn execute_with(
&self,
sql: &str,
params: &[serde_json::Value],
) -> crate::Result<crate::query::Rows> {
#[cfg(feature = "metrics")]
let start = std::time::Instant::now();
let result = self.execute_inner(sql, Some(params.to_vec())).await;
#[cfg(feature = "metrics")]
record_execute_latency(start);
result
}
async fn execute_inner(
&self,
sql: &str,
params: Option<Vec<serde_json::Value>>,
) -> crate::Result<crate::query::Rows> {
let id = self.inner.ids.next();
let request = Request::Sql {
id: id.clone(),
sql: sql.to_owned(),
rows: None,
parameters: params,
};
let resp = self.send(request).await?;
match resp {
Response::QueryResult(q) if q.id == id => {
Ok(crate::query::Rows::new(q, self.inner.handle.clone()))
}
Response::Error(e) => Err(crate::job_helpers::server_error(e)),
ref other => Err(crate::job_helpers::unexpected(other)),
}
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(skip(self), fields(job_id = %self.inner.initial_job, sql = %sql))
)]
pub async fn prepare(&self, sql: &str) -> crate::Result<crate::query::Query> {
let id = self.inner.ids.next();
let resp = self
.send(Request::PrepareSql {
id: id.clone(),
sql: sql.to_owned(),
})
.await?;
match resp {
Response::PreparedStatement {
id: got, cont_id, ..
} if got == id => Ok(crate::query::Query::new(cont_id, self.inner.handle.clone())),
Response::Error(e) => Err(crate::job_helpers::server_error(e)),
ref other => Err(crate::job_helpers::unexpected(other)),
}
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(skip(self), fields(job_id = %self.inner.initial_job))
)]
pub async fn ping(&self) -> crate::Result<std::time::Duration> {
let id = self.inner.ids.next();
let start = std::time::Instant::now();
let resp = self.send(Request::Ping { id: id.clone() }).await?;
match resp {
Response::Pong { id: got } if got == id => Ok(start.elapsed()),
ref other => Err(crate::job_helpers::unexpected(other)),
}
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(skip(self), fields(job_id = %self.inner.initial_job))
)]
pub async fn server_version(&self) -> crate::Result<String> {
let id = self.inner.ids.next();
let resp = self.send(Request::GetVersion { id: id.clone() }).await?;
match resp {
Response::Version {
id: got,
success,
version,
..
} if got == id => {
if success {
Ok(version)
} else {
Err(crate::job_helpers::server_failed("server_version"))
}
}
ref other => Err(crate::job_helpers::unexpected(other)),
}
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(skip(self), fields(job_id = %self.inner.initial_job))
)]
pub async fn db_job_name(&self) -> crate::Result<String> {
let id = self.inner.ids.next();
let resp = self.send(Request::GetDbJob { id: id.clone() }).await?;
match resp {
Response::DbJob {
id: got,
success,
job,
..
} if got == id => {
if success {
Ok(job)
} else {
Err(crate::job_helpers::server_failed("db_job_name"))
}
}
ref other => Err(crate::job_helpers::unexpected(other)),
}
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(skip(self), fields(job_id = %self.inner.initial_job, level = ?level))
)]
pub async fn set_trace(&self, level: TraceLevel) -> crate::Result<()> {
let id = self.inner.ids.next();
let resp = self
.send(Request::SetConfig {
id: id.clone(),
tracelevel: level.as_str().to_owned(),
tracedest: String::new(),
})
.await?;
match resp {
Response::ConfigSet {
id: got, success, ..
} if got == id => {
if success {
Ok(())
} else {
Err(crate::job_helpers::server_failed("set_trace"))
}
}
ref other => Err(crate::job_helpers::unexpected(other)),
}
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(skip(self), fields(job_id = %self.inner.initial_job))
)]
pub async fn fetch_trace(&self) -> crate::Result<String> {
let id = self.inner.ids.next();
let resp = self.send(Request::GetTraceData { id: id.clone() }).await?;
match resp {
Response::TraceData {
id: got,
success,
tracedata,
} if got == id => {
if success {
Ok(tracedata)
} else {
Err(crate::job_helpers::server_failed("fetch_trace"))
}
}
ref other => Err(crate::job_helpers::unexpected(other)),
}
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(skip(self), fields(job_id = %self.inner.initial_job, sql = %sql))
)]
pub async fn visual_explain(&self, sql: &str) -> crate::Result<serde_json::Value> {
let id = self.inner.ids.next();
let resp = self
.send(Request::Dove {
id: id.clone(),
sql: sql.to_owned(),
})
.await?;
match resp {
Response::DoveResult {
id: got,
success,
result,
} if got == id => {
if success {
Ok(result)
} else {
Err(crate::job_helpers::server_failed("visual_explain"))
}
}
Response::Error(e) => Err(crate::job_helpers::server_error(e)),
ref other => Err(crate::job_helpers::unexpected(other)),
}
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(skip(self), fields(job_id = %self.inner.initial_job, command = %command))
)]
pub async fn cl(&self, command: &str) -> crate::Result<crate::protocol::ClMessage> {
let id = self.inner.ids.next();
let resp = self
.send(Request::Cl {
id: id.clone(),
cmd: command.to_owned(),
})
.await?;
match resp {
Response::ClResult {
id: got,
success,
messages,
..
} if got == id => {
if !success {
return Err(crate::job_helpers::server_failed("cl"));
}
messages.into_iter().next().ok_or_else(|| {
Error::Internal("daemon returned ClResult with no messages".to_string())
})
}
Response::Error(e) => Err(crate::job_helpers::server_error(e)),
ref other => Err(crate::job_helpers::unexpected(other)),
}
}
}
#[cfg(feature = "metrics")]
fn record_execute_latency(start: std::time::Instant) {
let elapsed_micros = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
#[allow(clippy::cast_precision_loss)]
let micros_f64 = elapsed_micros as f64;
metrics::histogram!(crate::observability::JOB_EXECUTE_LATENCY_MICROS).record(micros_f64);
}
impl Drop for Job {
fn drop(&mut self) {
let handle = self.inner.handle.clone();
let id = self.inner.ids.next();
crate::job_helpers::spawn_best_effort(async move {
let _ = handle.send(Request::Exit { id }).await;
});
}
}