dcp_rs/
lib.rs

1#[path = "io/mod.rs"]
2mod dcp_io;
3
4use crate::dcp_io::client::Client;
5use crate::dcp_io::consts::Listener;
6pub use crate::dcp_io::consts::ListenerCallback;
7use crate::dcp_io::couchbase::Couchbase;
8use std::net::TcpStream;
9use std::sync::Arc;
10use std::{io, thread};
11
12pub struct GroupConfig {
13    pub name: String,
14}
15
16pub struct DcpConfig {
17    pub group: GroupConfig,
18}
19
20pub struct Config {
21    pub hosts: Vec<String>,
22    pub username: String,
23    pub password: String,
24    pub bucket: String,
25    pub scope_name: String,
26    pub collection_names: Vec<String>,
27    pub dcp: DcpConfig,
28}
29
30pub struct Dcp {
31    config: Arc<Config>,
32    client: Arc<Client>,
33}
34
35impl Dcp {
36    pub fn new(config: Config) -> io::Result<Self> {
37        let tcp_stream = TcpStream::connect(config.hosts[0].as_str())?;
38        let client: Client = Client::new(tcp_stream);
39
40        Ok(Self {
41            config: Arc::new(config),
42            client: Arc::new(client),
43        })
44    }
45
46    pub fn add_listener(&self, callback: Listener) {
47        self.client.add_listener(callback);
48    }
49
50    pub fn start(&self) -> io::Result<()> {
51        let client = Arc::clone(&self.client);
52        let config = Arc::clone(&self.config);
53
54        thread::spawn(move || {
55            let couchbase = Couchbase::new(&client);
56
57            match couchbase.connect(&config) {
58                Ok(..) => log::info!("stream started"),
59                Err(e) => log::error!("stream cannot started: {}", e),
60            }
61        });
62
63        self.client.start()
64    }
65
66    pub fn stop(&self) -> io::Result<()> {
67        self.client.stop();
68        Ok(())
69    }
70}