#[cfg(feature = "ent")]
#[cfg_attr(docsrs, doc(cfg(feature = "ent")))]
mod ent;
#[cfg(doc)]
use crate::proto::{BatchStatus, Progress, ProgressUpdate};
use super::utils::{get_env_url, url_parse};
use super::{single, Info, Push, QueueAction, QueueControl};
use super::{utils, PushBulk};
use crate::error::{self, Error};
use crate::{Job, Reconnect, WorkerId};
use std::collections::HashMap;
use tokio::io::{AsyncBufRead, AsyncWrite, BufStream};
use tokio::net::TcpStream as TokioStream;
mod options;
pub(crate) use options::ClientOptions;
mod conn;
pub(crate) use conn::BoxedConnection;
pub use conn::Connection;
mod mutation;
pub(crate) const EXPECTED_PROTOCOL_VERSION: usize = 2;
fn check_protocols_match(ver: usize) -> Result<(), Error> {
if ver != EXPECTED_PROTOCOL_VERSION {
return Err(error::Connect::VersionMismatch {
ours: EXPECTED_PROTOCOL_VERSION,
theirs: ver,
}
.into());
}
Ok(())
}
pub struct Client {
stream: BoxedConnection,
opts: ClientOptions,
}
impl Client {
pub(crate) async fn connect_again(&mut self) -> Result<Self, Error> {
let s = self.stream.reconnect().await?;
Client::new(s, self.opts.clone()).await
}
pub(crate) async fn reconnect(&mut self) -> Result<(), Error> {
self.stream = self.stream.reconnect().await?;
self.init().await
}
}
impl Drop for Client {
fn drop(&mut self) {
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
single::write_command(&mut self.stream, &single::End)
.await
.unwrap();
})
});
}
}
pub(crate) enum HeartbeatStatus {
Ok,
Terminate,
Quiet,
}
impl Client {
pub async fn connect_with<S>(stream: S, pwd: Option<String>) -> Result<Client, Error>
where
S: AsyncBufRead + AsyncWrite + Reconnect + Send + Sync + Unpin + 'static,
{
let opts = ClientOptions {
password: pwd,
..Default::default()
};
Client::new(Box::new(stream), opts).await
}
}
impl Client {
pub async fn connect() -> Result<Client, Error> {
let url = get_env_url();
Self::connect_to(&url).await
}
pub async fn connect_to(addr: &str) -> Result<Client, Error> {
let url = url_parse(addr)?;
let stream = TokioStream::connect(utils::host_from_url(&url)).await?;
let buffered_stream = BufStream::new(stream);
Self::connect_with(buffered_stream, url.password().map(|p| p.to_string())).await
}
}
impl Client {
async fn init(&mut self) -> Result<(), Error> {
let hi = single::read_hi(&mut self.stream).await?;
check_protocols_match(hi.version)?;
let mut hello = single::Hello::default();
if hi.salt.is_some() {
if let Some(ref pwd) = self.opts.password {
hello.set_password(&hi, pwd);
} else {
return Err(error::Connect::AuthenticationNeeded.into());
}
}
if self.opts.is_worker {
let hostname = self
.opts
.hostname
.clone()
.or_else(|| hostname::get().ok()?.into_string().ok())
.unwrap_or_else(|| "local".to_string());
self.opts.hostname = Some(hostname);
let pid = self.opts.pid.unwrap_or_else(|| std::process::id() as usize);
self.opts.pid = Some(pid);
let wid = self.opts.wid.clone().unwrap_or_else(WorkerId::random);
self.opts.wid = Some(wid);
hello.hostname = Some(self.opts.hostname.clone().unwrap());
hello.wid = Some(self.opts.wid.clone().unwrap());
hello.pid = Some(self.opts.pid.unwrap());
hello.labels.clone_from(&self.opts.labels);
}
single::write_command_and_await_ok(&mut self.stream, &hello).await?;
Ok(())
}
pub(crate) async fn new(stream: BoxedConnection, opts: ClientOptions) -> Result<Client, Error> {
let mut c = Client { stream, opts };
c.init().await?;
Ok(c)
}
pub(crate) async fn issue<FC: single::FaktoryCommand>(
&mut self,
c: &FC,
) -> Result<ReadToken<'_>, Error> {
single::write_command(&mut self.stream, c).await?;
Ok(ReadToken(self))
}
pub(crate) async fn fetch<Q>(&mut self, queues: &[Q]) -> Result<Option<Job>, Error>
where
Q: AsRef<str> + Sync,
{
self.issue(&single::Fetch::from(queues))
.await?
.read_json()
.await
}
pub(crate) async fn heartbeat(&mut self) -> Result<HeartbeatStatus, Error> {
single::write_command(
&mut self.stream,
&single::Heartbeat::new(self.opts.wid.as_ref().unwrap().clone()),
)
.await?;
match single::read_json::<_, serde_json::Value>(&mut self.stream).await? {
None => Ok(HeartbeatStatus::Ok),
Some(s) => match s
.as_object()
.and_then(|m| m.get("state"))
.and_then(|s| s.as_str())
{
Some("terminate") => Ok(HeartbeatStatus::Terminate),
Some("quiet") => Ok(HeartbeatStatus::Quiet),
_ => Err(error::Protocol::BadType {
expected: "heartbeat response",
received: format!("{}", s),
}
.into()),
},
}
}
pub(crate) async fn perform_queue_action<Q>(
&mut self,
queues: &[Q],
action: QueueAction,
) -> Result<(), Error>
where
Q: AsRef<str> + Sync,
{
self.issue(&QueueControl::new(action, queues))
.await?
.read_ok()
.await
}
}
impl Client {
pub async fn enqueue(&mut self, job: Job) -> Result<(), Error> {
self.issue(&Push::from(job)).await?.read_ok().await
}
pub async fn enqueue_many<J>(
&mut self,
jobs: J,
) -> Result<(usize, Option<HashMap<String, String>>), Error>
where
J: IntoIterator<Item = Job>,
J::IntoIter: ExactSizeIterator,
{
let jobs = jobs.into_iter();
let jobs_count = jobs.len();
let errors: HashMap<String, String> = self
.issue(&PushBulk::from(jobs.collect::<Vec<_>>()))
.await?
.read_json()
.await?
.expect("Faktory server sends {} literal when there are no errors");
if errors.is_empty() {
return Ok((jobs_count, None));
}
Ok((jobs_count - errors.len(), Some(errors)))
}
pub async fn current_info(&mut self) -> Result<single::FaktoryState, Error> {
self.issue(&Info)
.await?
.read_json()
.await
.map(|v| v.expect("info command cannot give empty response"))
}
pub async fn queue_pause<Q>(&mut self, queues: &[Q]) -> Result<(), Error>
where
Q: AsRef<str> + Sync,
{
self.perform_queue_action(queues, QueueAction::Pause).await
}
pub async fn queue_pause_all(&mut self) -> Result<(), Error> {
self.perform_queue_action(&["*"], QueueAction::Pause).await
}
pub async fn queue_resume<Q>(&mut self, queues: &[Q]) -> Result<(), Error>
where
Q: AsRef<str> + Sync,
{
self.perform_queue_action(queues, QueueAction::Resume).await
}
pub async fn queue_resume_all(&mut self) -> Result<(), Error> {
self.perform_queue_action(&["*"], QueueAction::Resume).await
}
pub async fn queue_remove<Q>(&mut self, queues: &[Q]) -> Result<(), Error>
where
Q: AsRef<str> + Sync,
{
self.perform_queue_action(queues, QueueAction::Remove).await
}
pub async fn queue_remove_all(&mut self) -> Result<(), Error> {
self.perform_queue_action(&["*"], QueueAction::Remove).await
}
}
pub struct ReadToken<'a>(pub(crate) &'a mut Client);
impl ReadToken<'_> {
pub(crate) async fn read_ok(self) -> Result<(), Error> {
single::read_ok(&mut self.0.stream).await
}
pub(crate) async fn read_json<T>(self) -> Result<Option<T>, Error>
where
T: serde::de::DeserializeOwned,
{
single::read_json(&mut self.0.stream).await
}
}