pub struct Client { /* private fields */ }
Expand description
Client for interacting with Gearman service
Both workers and clients will use this as the top-level object to communicate with a gearman server. Client::new should produce a functioning structure which should then be configured as needed. It will not do anything useful until after Client::connect has been called.
See examples for more information on how to use it.
Implementations§
source§impl Client
impl Client
sourcepub fn new() -> Client
pub fn new() -> Client
Examples found in repository?
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let mut client = Client::new()
.add_server("127.0.0.1:4730")
.add_server("127.0.0.1:4731")
.connect()
.await?;
println!("Connected!");
println!("Echo: {:?}", client.echo(b"blah").await);
let mut jobs = Vec::new();
for x in 0..10 {
let payload = format!("payload{}", x);
jobs.push(client.submit("reverse", payload.as_bytes()).await?);
}
println!(
"Submitted {}",
jobs.iter().map(|j| format!("{}", j)).collect::<String>()
);
for job in jobs.iter_mut() {
let response = job.response().await;
println!("Response for [{:?}] is [{:?}]", job.handle(), response)
}
Ok(())
}
More examples
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let server = "127.0.0.1:4730";
let mut client = Client::new().add_server(server).connect().await?;
println!("Connected!");
println!("Echo: {:?}", client.echo(b"blah").await);
let job = client.submit_background("reverse", b"abcdefg").await?;
println!("Submitted {:?}", job.handle());
let status = client.get_status(job.handle()).await?;
println!("Status {:?}", status);
let mut job = client.submit("reverse", b"bloop").await?;
println!("Submitted {:?}", job.handle());
let response = job.response().await?;
println!("Got Back {:?}", response);
let mut job = client.submit_unique("reverse", b"bloop1", b"bloop").await?;
println!("Submitted unique {:?}", job.handle());
let response = job.response().await?;
println!("Got Back {:?}", response);
Ok(())
}
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let worker = Client::new();
worker
.add_server("127.0.0.1:4731")
.add_server("127.0.0.1:4730")
//.add_server("127.0.0.1:4731") Add all of your servers here
.set_client_id("example")
.connect()
.await
.expect("CONNECT failed")
.can_do("reverse", |job| {
let payload = String::from_utf8(job.payload().to_vec()).unwrap();
println!("reversing {}", payload);
let reversed: String = payload.chars().rev().collect();
let reversed: Vec<u8> = reversed.into_bytes();
Ok(reversed)
})
.await
.expect("CAN_DO reverse failed")
.can_do("alwaysfail", |_job| {
Err(io::Error::new(io::ErrorKind::Other, "Always fails"))
})
.await
.expect("CAN_DO alwaysfail failed")
//.can_do_async("status", status_user)
.work()
.await
.expect("WORK FAILED");
Ok(())
}
sourcepub fn add_server(self, server: &str) -> Self
pub fn add_server(self, server: &str) -> Self
Add a server to the client. This does not initiate anything, it just configures the client.
Examples found in repository?
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let mut client = Client::new()
.add_server("127.0.0.1:4730")
.add_server("127.0.0.1:4731")
.connect()
.await?;
println!("Connected!");
println!("Echo: {:?}", client.echo(b"blah").await);
let mut jobs = Vec::new();
for x in 0..10 {
let payload = format!("payload{}", x);
jobs.push(client.submit("reverse", payload.as_bytes()).await?);
}
println!(
"Submitted {}",
jobs.iter().map(|j| format!("{}", j)).collect::<String>()
);
for job in jobs.iter_mut() {
let response = job.response().await;
println!("Response for [{:?}] is [{:?}]", job.handle(), response)
}
Ok(())
}
More examples
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let server = "127.0.0.1:4730";
let mut client = Client::new().add_server(server).connect().await?;
println!("Connected!");
println!("Echo: {:?}", client.echo(b"blah").await);
let job = client.submit_background("reverse", b"abcdefg").await?;
println!("Submitted {:?}", job.handle());
let status = client.get_status(job.handle()).await?;
println!("Status {:?}", status);
let mut job = client.submit("reverse", b"bloop").await?;
println!("Submitted {:?}", job.handle());
let response = job.response().await?;
println!("Got Back {:?}", response);
let mut job = client.submit_unique("reverse", b"bloop1", b"bloop").await?;
println!("Submitted unique {:?}", job.handle());
let response = job.response().await?;
println!("Got Back {:?}", response);
Ok(())
}
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let worker = Client::new();
worker
.add_server("127.0.0.1:4731")
.add_server("127.0.0.1:4730")
//.add_server("127.0.0.1:4731") Add all of your servers here
.set_client_id("example")
.connect()
.await
.expect("CONNECT failed")
.can_do("reverse", |job| {
let payload = String::from_utf8(job.payload().to_vec()).unwrap();
println!("reversing {}", payload);
let reversed: String = payload.chars().rev().collect();
let reversed: Vec<u8> = reversed.into_bytes();
Ok(reversed)
})
.await
.expect("CAN_DO reverse failed")
.can_do("alwaysfail", |_job| {
Err(io::Error::new(io::ErrorKind::Other, "Always fails"))
})
.await
.expect("CAN_DO alwaysfail failed")
//.can_do_async("status", status_user)
.work()
.await
.expect("WORK FAILED");
Ok(())
}
sourcepub fn set_client_id(self, client_id: &'static str) -> Self
pub fn set_client_id(self, client_id: &'static str) -> Self
Configures the client ID for this client
This has no effect if called after connect()
Examples found in repository?
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let worker = Client::new();
worker
.add_server("127.0.0.1:4731")
.add_server("127.0.0.1:4730")
//.add_server("127.0.0.1:4731") Add all of your servers here
.set_client_id("example")
.connect()
.await
.expect("CONNECT failed")
.can_do("reverse", |job| {
let payload = String::from_utf8(job.payload().to_vec()).unwrap();
println!("reversing {}", payload);
let reversed: String = payload.chars().rev().collect();
let reversed: Vec<u8> = reversed.into_bytes();
Ok(reversed)
})
.await
.expect("CAN_DO reverse failed")
.can_do("alwaysfail", |_job| {
Err(io::Error::new(io::ErrorKind::Other, "Always fails"))
})
.await
.expect("CAN_DO alwaysfail failed")
//.can_do_async("status", status_user)
.work()
.await
.expect("WORK FAILED");
Ok(())
}
sourcepub fn active_servers(&self) -> Vec<Hostname>
pub fn active_servers(&self) -> Vec<Hostname>
Returns a Vec of references to strings corresponding to only active servers
sourcepub async fn connect(self) -> Result<Self, Box<dyn Error>>
pub async fn connect(self) -> Result<Self, Box<dyn Error>>
Blocks until all servers added via [Client.add_server] are connected
Examples found in repository?
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let mut client = Client::new()
.add_server("127.0.0.1:4730")
.add_server("127.0.0.1:4731")
.connect()
.await?;
println!("Connected!");
println!("Echo: {:?}", client.echo(b"blah").await);
let mut jobs = Vec::new();
for x in 0..10 {
let payload = format!("payload{}", x);
jobs.push(client.submit("reverse", payload.as_bytes()).await?);
}
println!(
"Submitted {}",
jobs.iter().map(|j| format!("{}", j)).collect::<String>()
);
for job in jobs.iter_mut() {
let response = job.response().await;
println!("Response for [{:?}] is [{:?}]", job.handle(), response)
}
Ok(())
}
More examples
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let server = "127.0.0.1:4730";
let mut client = Client::new().add_server(server).connect().await?;
println!("Connected!");
println!("Echo: {:?}", client.echo(b"blah").await);
let job = client.submit_background("reverse", b"abcdefg").await?;
println!("Submitted {:?}", job.handle());
let status = client.get_status(job.handle()).await?;
println!("Status {:?}", status);
let mut job = client.submit("reverse", b"bloop").await?;
println!("Submitted {:?}", job.handle());
let response = job.response().await?;
println!("Got Back {:?}", response);
let mut job = client.submit_unique("reverse", b"bloop1", b"bloop").await?;
println!("Submitted unique {:?}", job.handle());
let response = job.response().await?;
println!("Got Back {:?}", response);
Ok(())
}
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let worker = Client::new();
worker
.add_server("127.0.0.1:4731")
.add_server("127.0.0.1:4730")
//.add_server("127.0.0.1:4731") Add all of your servers here
.set_client_id("example")
.connect()
.await
.expect("CONNECT failed")
.can_do("reverse", |job| {
let payload = String::from_utf8(job.payload().to_vec()).unwrap();
println!("reversing {}", payload);
let reversed: String = payload.chars().rev().collect();
let reversed: Vec<u8> = reversed.into_bytes();
Ok(reversed)
})
.await
.expect("CAN_DO reverse failed")
.can_do("alwaysfail", |_job| {
Err(io::Error::new(io::ErrorKind::Other, "Always fails"))
})
.await
.expect("CAN_DO alwaysfail failed")
//.can_do_async("status", status_user)
.work()
.await
.expect("WORK FAILED");
Ok(())
}
sourcepub async fn echo(&mut self, payload: &[u8]) -> Result<(), Error>
pub async fn echo(&mut self, payload: &[u8]) -> Result<(), Error>
Sends an ECHO_REQ to the first server, a good way to confirm the connection is alive
Returns an error if there aren’t any connected servers, or no ECHO_RES comes back
Examples found in repository?
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let mut client = Client::new()
.add_server("127.0.0.1:4730")
.add_server("127.0.0.1:4731")
.connect()
.await?;
println!("Connected!");
println!("Echo: {:?}", client.echo(b"blah").await);
let mut jobs = Vec::new();
for x in 0..10 {
let payload = format!("payload{}", x);
jobs.push(client.submit("reverse", payload.as_bytes()).await?);
}
println!(
"Submitted {}",
jobs.iter().map(|j| format!("{}", j)).collect::<String>()
);
for job in jobs.iter_mut() {
let response = job.response().await;
println!("Response for [{:?}] is [{:?}]", job.handle(), response)
}
Ok(())
}
More examples
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let server = "127.0.0.1:4730";
let mut client = Client::new().add_server(server).connect().await?;
println!("Connected!");
println!("Echo: {:?}", client.echo(b"blah").await);
let job = client.submit_background("reverse", b"abcdefg").await?;
println!("Submitted {:?}", job.handle());
let status = client.get_status(job.handle()).await?;
println!("Status {:?}", status);
let mut job = client.submit("reverse", b"bloop").await?;
println!("Submitted {:?}", job.handle());
let response = job.response().await?;
println!("Got Back {:?}", response);
let mut job = client.submit_unique("reverse", b"bloop1", b"bloop").await?;
println!("Submitted unique {:?}", job.handle());
let response = job.response().await?;
println!("Got Back {:?}", response);
Ok(())
}
sourcepub async fn submit(
&mut self,
function: &str,
payload: &[u8]
) -> Result<ClientJob, Error>
pub async fn submit( &mut self, function: &str, payload: &[u8] ) -> Result<ClientJob, Error>
Submits a foreground job. The see ClientJob::response for how to see the response from the worker. The unique ID will be generated using Uuid::new_v4
Examples found in repository?
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let mut client = Client::new()
.add_server("127.0.0.1:4730")
.add_server("127.0.0.1:4731")
.connect()
.await?;
println!("Connected!");
println!("Echo: {:?}", client.echo(b"blah").await);
let mut jobs = Vec::new();
for x in 0..10 {
let payload = format!("payload{}", x);
jobs.push(client.submit("reverse", payload.as_bytes()).await?);
}
println!(
"Submitted {}",
jobs.iter().map(|j| format!("{}", j)).collect::<String>()
);
for job in jobs.iter_mut() {
let response = job.response().await;
println!("Response for [{:?}] is [{:?}]", job.handle(), response)
}
Ok(())
}
More examples
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let server = "127.0.0.1:4730";
let mut client = Client::new().add_server(server).connect().await?;
println!("Connected!");
println!("Echo: {:?}", client.echo(b"blah").await);
let job = client.submit_background("reverse", b"abcdefg").await?;
println!("Submitted {:?}", job.handle());
let status = client.get_status(job.handle()).await?;
println!("Status {:?}", status);
let mut job = client.submit("reverse", b"bloop").await?;
println!("Submitted {:?}", job.handle());
let response = job.response().await?;
println!("Got Back {:?}", response);
let mut job = client.submit_unique("reverse", b"bloop1", b"bloop").await?;
println!("Submitted unique {:?}", job.handle());
let response = job.response().await?;
println!("Got Back {:?}", response);
Ok(())
}
sourcepub async fn submit_unique(
&mut self,
function: &str,
unique: &[u8],
payload: &[u8]
) -> Result<ClientJob, Error>
pub async fn submit_unique( &mut self, function: &str, unique: &[u8], payload: &[u8] ) -> Result<ClientJob, Error>
Submits a job with an explicit unique ID.
Examples found in repository?
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let server = "127.0.0.1:4730";
let mut client = Client::new().add_server(server).connect().await?;
println!("Connected!");
println!("Echo: {:?}", client.echo(b"blah").await);
let job = client.submit_background("reverse", b"abcdefg").await?;
println!("Submitted {:?}", job.handle());
let status = client.get_status(job.handle()).await?;
println!("Status {:?}", status);
let mut job = client.submit("reverse", b"bloop").await?;
println!("Submitted {:?}", job.handle());
let response = job.response().await?;
println!("Got Back {:?}", response);
let mut job = client.submit_unique("reverse", b"bloop1", b"bloop").await?;
println!("Submitted unique {:?}", job.handle());
let response = job.response().await?;
println!("Got Back {:?}", response);
Ok(())
}
sourcepub async fn submit_background(
&mut self,
function: &str,
payload: &[u8]
) -> Result<ClientJob, Error>
pub async fn submit_background( &mut self, function: &str, payload: &[u8] ) -> Result<ClientJob, Error>
Submits a background job. The ClientJob returned won’t be able to use the ClientJob::response method because the server will never send packets for it.
Examples found in repository?
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let server = "127.0.0.1:4730";
let mut client = Client::new().add_server(server).connect().await?;
println!("Connected!");
println!("Echo: {:?}", client.echo(b"blah").await);
let job = client.submit_background("reverse", b"abcdefg").await?;
println!("Submitted {:?}", job.handle());
let status = client.get_status(job.handle()).await?;
println!("Status {:?}", status);
let mut job = client.submit("reverse", b"bloop").await?;
println!("Submitted {:?}", job.handle());
let response = job.response().await?;
println!("Got Back {:?}", response);
let mut job = client.submit_unique("reverse", b"bloop1", b"bloop").await?;
println!("Submitted unique {:?}", job.handle());
let response = job.response().await?;
println!("Got Back {:?}", response);
Ok(())
}
sourcepub async fn submit_background_unique(
&mut self,
function: &str,
unique: &[u8],
payload: &[u8]
) -> Result<ClientJob, Error>
pub async fn submit_background_unique( &mut self, function: &str, unique: &[u8], payload: &[u8] ) -> Result<ClientJob, Error>
Submits a background job. The ClientJob returned won’t be able to use the ClientJob::response method because the server will never send packets for it.
sourcepub async fn get_status(
&mut self,
handle: &ServerHandle
) -> Result<JobStatus, Error>
pub async fn get_status( &mut self, handle: &ServerHandle ) -> Result<JobStatus, Error>
Sends a GET_STATUS packet and then returns the STATUS_RES in a JobStatus
Examples found in repository?
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let server = "127.0.0.1:4730";
let mut client = Client::new().add_server(server).connect().await?;
println!("Connected!");
println!("Echo: {:?}", client.echo(b"blah").await);
let job = client.submit_background("reverse", b"abcdefg").await?;
println!("Submitted {:?}", job.handle());
let status = client.get_status(job.handle()).await?;
println!("Status {:?}", status);
let mut job = client.submit("reverse", b"bloop").await?;
println!("Submitted {:?}", job.handle());
let response = job.response().await?;
println!("Got Back {:?}", response);
let mut job = client.submit_unique("reverse", b"bloop1", b"bloop").await?;
println!("Submitted unique {:?}", job.handle());
let response = job.response().await?;
println!("Got Back {:?}", response);
Ok(())
}
sourcepub async fn can_do<F>(self, function: &str, func: F) -> Result<Self, Error>
pub async fn can_do<F>(self, function: &str, func: F) -> Result<Self, Error>
Sends a CAN_DO on every connection and registers a callback for it
This informs the gearman server(s) of what “functions” your worker can perform, and it takes a closure which will be passed a mutable reference to jobs assigned to it. The function should return a vector of bytes to signal completion, that will trigger a WORK_COMPLETE packet to the server with the contents of the returned vector as the payload. If it returns an error, this will trigger a WORK_FAIL packet.
See examples/worker.rs for more information.
Examples found in repository?
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let worker = Client::new();
worker
.add_server("127.0.0.1:4731")
.add_server("127.0.0.1:4730")
//.add_server("127.0.0.1:4731") Add all of your servers here
.set_client_id("example")
.connect()
.await
.expect("CONNECT failed")
.can_do("reverse", |job| {
let payload = String::from_utf8(job.payload().to_vec()).unwrap();
println!("reversing {}", payload);
let reversed: String = payload.chars().rev().collect();
let reversed: Vec<u8> = reversed.into_bytes();
Ok(reversed)
})
.await
.expect("CAN_DO reverse failed")
.can_do("alwaysfail", |_job| {
Err(io::Error::new(io::ErrorKind::Other, "Always fails"))
})
.await
.expect("CAN_DO alwaysfail failed")
//.can_do_async("status", status_user)
.work()
.await
.expect("WORK FAILED");
Ok(())
}
sourcepub async fn do_one_job(&mut self) -> Result<(), Error>
pub async fn do_one_job(&mut self) -> Result<(), Error>
Receive and do just one job. Will not return until a job is done or there is an error. This is called in a loop by Client::work.
sourcepub async fn work(self) -> Result<(), Error>
pub async fn work(self) -> Result<(), Error>
Run the assigned jobs through can_do functions until an error happens
After you have set up all functions your worker can do via the Client::can_do method, call this function to begin working. It will not return unless there is an unexpected error.
See examples/worker.rs for more information on how to use it.
Examples found in repository?
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let worker = Client::new();
worker
.add_server("127.0.0.1:4731")
.add_server("127.0.0.1:4730")
//.add_server("127.0.0.1:4731") Add all of your servers here
.set_client_id("example")
.connect()
.await
.expect("CONNECT failed")
.can_do("reverse", |job| {
let payload = String::from_utf8(job.payload().to_vec()).unwrap();
println!("reversing {}", payload);
let reversed: String = payload.chars().rev().collect();
let reversed: Vec<u8> = reversed.into_bytes();
Ok(reversed)
})
.await
.expect("CAN_DO reverse failed")
.can_do("alwaysfail", |_job| {
Err(io::Error::new(io::ErrorKind::Other, "Always fails"))
})
.await
.expect("CAN_DO alwaysfail failed")
//.can_do_async("status", status_user)
.work()
.await
.expect("WORK FAILED");
Ok(())
}