SyncClient

Struct SyncClient 

Source
pub struct SyncClient<T>
where T: Send + Sync + 'static,
{ /* private fields */ }
Expand description

§Sync Client

loop {
    let config = Config::new().unwrap();
    let state = Arc::new(Mutex::new(0));

    let mut client = SyncClient::new(config, state.clone());

    let rx = match client.connect() {
        Ok(rx) => rx,
        Err(e) => {
            println!("Connection error: {}", e);
            sleep(Duration::from_secs(1));
            continue;
        }
    };

    client.subscribe("topic", |state, msg| {
        let mut state = state.lock().unwrap();
        *state += 1;
        println!("Received message: {:?}", msg);
    })?;

    let should_exit;
    loop {
        let result = rx.try_recv();
        match result {
            Ok(AmqError::TcpServerClosed) => {
                should_exit = true;
                break;
            }
            Ok(e) => {
                println!("{}", e);
                should_exit = false;
                break;
            }
            Err(TryRecvError::Empty) => {
                sleep(Duration::from_secs(1));
                let _ = client.publish("topic", "Hello, world!".as_bytes().to_vec());
            }
            Err(e) => {
                println!("Receive signal error: {:?}", e);
                should_exit = false;
                break;
            }
        }
    }

    if should_exit {
        break;
    }

    println!("Reconnecting...");
}

Implementations§

Source§

impl<T> Client<T>
where T: Send + Sync + 'static,

Source

pub fn new(config: Config, state: Arc<T>) -> Self

§Create a new client.
Examples found in repository?
examples/sync_pubsub.rs (line 17)
11async fn main() -> Result<(), Box<dyn Error>> {
12    let state = Arc::new(Mutex::new(0));
13
14    loop {
15        let config = Config::new().unwrap();
16
17        let mut client = SyncClient::new(config, state.clone());
18
19        let rx = match client.connect() {
20            Ok(rx) => rx,
21            Err(e) => {
22                println!("Connection error: {}", e);
23                sleep(Duration::from_secs(1));
24                continue;
25            }
26        };
27
28        client.subscribe("topic", |state, msg| {
29            let mut state = state.lock().unwrap();
30            *state += 1;
31            println!("Received message: {} {:?}", state, msg);
32        })?;
33
34        let should_exit;
35        loop {
36            let result = rx.try_recv();
37            match result {
38                Ok(AmqError::TcpServerClosed) => {
39                    should_exit = true;
40                    break;
41                }
42                Ok(e) => {
43                    println!("{}", e);
44                    should_exit = false;
45                    break;
46                }
47                Err(TryRecvError::Empty) => {
48                    sleep(Duration::from_secs(1));
49                    let _ = client.publish("topic", "Hello, world!".as_bytes().to_vec());
50                }
51                Err(e) => {
52                    println!("Receive signal error: {:?}", e);
53                    should_exit = false;
54                    break;
55                }
56            }
57        }
58
59        if should_exit {
60            break;
61        }
62
63        println!("Reconnecting...");
64    }
65
66    Ok(())
67}
Source

pub fn subscribe<F>(&mut self, topic: &str, f: F) -> Result<(), AmqError>
where F: Fn(Arc<T>, Vec<u8>) + Send + Sync + 'static,

§Subscribe a topic.
client.subscribe("topic", |msg| {
    println!("Received message: {:?}", msg);
}).unwrap();
Examples found in repository?
examples/sync_pubsub.rs (lines 28-32)
11async fn main() -> Result<(), Box<dyn Error>> {
12    let state = Arc::new(Mutex::new(0));
13
14    loop {
15        let config = Config::new().unwrap();
16
17        let mut client = SyncClient::new(config, state.clone());
18
19        let rx = match client.connect() {
20            Ok(rx) => rx,
21            Err(e) => {
22                println!("Connection error: {}", e);
23                sleep(Duration::from_secs(1));
24                continue;
25            }
26        };
27
28        client.subscribe("topic", |state, msg| {
29            let mut state = state.lock().unwrap();
30            *state += 1;
31            println!("Received message: {} {:?}", state, msg);
32        })?;
33
34        let should_exit;
35        loop {
36            let result = rx.try_recv();
37            match result {
38                Ok(AmqError::TcpServerClosed) => {
39                    should_exit = true;
40                    break;
41                }
42                Ok(e) => {
43                    println!("{}", e);
44                    should_exit = false;
45                    break;
46                }
47                Err(TryRecvError::Empty) => {
48                    sleep(Duration::from_secs(1));
49                    let _ = client.publish("topic", "Hello, world!".as_bytes().to_vec());
50                }
51                Err(e) => {
52                    println!("Receive signal error: {:?}", e);
53                    should_exit = false;
54                    break;
55                }
56            }
57        }
58
59        if should_exit {
60            break;
61        }
62
63        println!("Reconnecting...");
64    }
65
66    Ok(())
67}
Source

pub fn unsubscribe(&mut self, topic: &str) -> Result<(), AmqError>

§Unsubscribe a topic.
client.unsubscribe("topic").unwrap();
Source

pub fn publish(&mut self, topic: &str, content: Vec<u8>) -> Result<(), AmqError>

§Publish a message.
client.publish("topic", "Hello, world!".as_bytes().to_vec()).unwrap();
Examples found in repository?
examples/sync_pubsub.rs (line 49)
11async fn main() -> Result<(), Box<dyn Error>> {
12    let state = Arc::new(Mutex::new(0));
13
14    loop {
15        let config = Config::new().unwrap();
16
17        let mut client = SyncClient::new(config, state.clone());
18
19        let rx = match client.connect() {
20            Ok(rx) => rx,
21            Err(e) => {
22                println!("Connection error: {}", e);
23                sleep(Duration::from_secs(1));
24                continue;
25            }
26        };
27
28        client.subscribe("topic", |state, msg| {
29            let mut state = state.lock().unwrap();
30            *state += 1;
31            println!("Received message: {} {:?}", state, msg);
32        })?;
33
34        let should_exit;
35        loop {
36            let result = rx.try_recv();
37            match result {
38                Ok(AmqError::TcpServerClosed) => {
39                    should_exit = true;
40                    break;
41                }
42                Ok(e) => {
43                    println!("{}", e);
44                    should_exit = false;
45                    break;
46                }
47                Err(TryRecvError::Empty) => {
48                    sleep(Duration::from_secs(1));
49                    let _ = client.publish("topic", "Hello, world!".as_bytes().to_vec());
50                }
51                Err(e) => {
52                    println!("Receive signal error: {:?}", e);
53                    should_exit = false;
54                    break;
55                }
56            }
57        }
58
59        if should_exit {
60            break;
61        }
62
63        println!("Reconnecting...");
64    }
65
66    Ok(())
67}
Source

pub fn consume<F>(&mut self, topic: &str, f: F) -> Result<(), AmqError>
where F: Fn(Arc<T>, Vec<u8>) + Send + Sync + 'static,

§Add a consume callback for a topic.
client.consume("topic", |msg| {
    println!("Received message: {:?}", msg);
}).unwrap();
Source

pub fn unconsume(&mut self, topic: &str) -> Result<(), AmqError>

§Remove a consume callback for a topic.
client.unconsume("topic").unwrap();
Source

pub fn ack(&mut self, message_id: u64) -> Result<(), AmqError>

§Acknowledge a message.
client.ack(message_id).unwrap();
Source

pub fn ack_multi(&mut self, message_ids: Vec<u64>) -> Result<(), AmqError>

§Acknowledge multiple messages.
client.ack_multi(vec![message_id1, message_id2]).unwrap();
Source

pub fn connect(&mut self) -> Result<Receiver<AmqError>, Box<dyn Error>>

§Connect to the server and start the receive task.
Examples found in repository?
examples/sync_pubsub.rs (line 19)
11async fn main() -> Result<(), Box<dyn Error>> {
12    let state = Arc::new(Mutex::new(0));
13
14    loop {
15        let config = Config::new().unwrap();
16
17        let mut client = SyncClient::new(config, state.clone());
18
19        let rx = match client.connect() {
20            Ok(rx) => rx,
21            Err(e) => {
22                println!("Connection error: {}", e);
23                sleep(Duration::from_secs(1));
24                continue;
25            }
26        };
27
28        client.subscribe("topic", |state, msg| {
29            let mut state = state.lock().unwrap();
30            *state += 1;
31            println!("Received message: {} {:?}", state, msg);
32        })?;
33
34        let should_exit;
35        loop {
36            let result = rx.try_recv();
37            match result {
38                Ok(AmqError::TcpServerClosed) => {
39                    should_exit = true;
40                    break;
41                }
42                Ok(e) => {
43                    println!("{}", e);
44                    should_exit = false;
45                    break;
46                }
47                Err(TryRecvError::Empty) => {
48                    sleep(Duration::from_secs(1));
49                    let _ = client.publish("topic", "Hello, world!".as_bytes().to_vec());
50                }
51                Err(e) => {
52                    println!("Receive signal error: {:?}", e);
53                    should_exit = false;
54                    break;
55                }
56            }
57        }
58
59        if should_exit {
60            break;
61        }
62
63        println!("Reconnecting...");
64    }
65
66    Ok(())
67}
Source

pub fn tcp_conn( &mut self, stream: TcpStream, ) -> Result<Receiver<AmqError>, Box<dyn Error>>

Source

pub fn unix_conn( &mut self, stream: UnixStream, ) -> Result<Receiver<AmqError>, Box<dyn Error>>

Source

pub fn shutdown(&mut self)

§Shutdown the client.

Auto Trait Implementations§

§

impl<T> Freeze for Client<T>

§

impl<T> !RefUnwindSafe for Client<T>

§

impl<T> Send for Client<T>

§

impl<T> Sync for Client<T>

§

impl<T> Unpin for Client<T>

§

impl<T> !UnwindSafe for Client<T>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.