1#[path = "io/mod.rs"]
2mod dcp_io;
3
4use crate::dcp_io::client::Client;
5use crate::dcp_io::consts::Listener;
6pub use crate::dcp_io::consts::ListenerCallback;
7use crate::dcp_io::couchbase::Couchbase;
8use std::net::TcpStream;
9use std::sync::Arc;
10use std::{io, thread};
11
12pub struct GroupConfig {
13 pub name: String,
14}
15
16pub struct DcpConfig {
17 pub group: GroupConfig,
18}
19
20pub struct Config {
21 pub hosts: Vec<String>,
22 pub username: String,
23 pub password: String,
24 pub bucket: String,
25 pub scope_name: String,
26 pub collection_names: Vec<String>,
27 pub dcp: DcpConfig,
28}
29
30pub struct Dcp {
31 config: Arc<Config>,
32 client: Arc<Client>,
33}
34
35impl Dcp {
36 pub fn new(config: Config) -> io::Result<Self> {
37 let tcp_stream = TcpStream::connect(config.hosts[0].as_str())?;
38 let client: Client = Client::new(tcp_stream);
39
40 Ok(Self {
41 config: Arc::new(config),
42 client: Arc::new(client),
43 })
44 }
45
46 pub fn add_listener(&self, callback: Listener) {
47 self.client.add_listener(callback);
48 }
49
50 pub fn start(&self) -> io::Result<()> {
51 let client = Arc::clone(&self.client);
52 let config = Arc::clone(&self.config);
53
54 thread::spawn(move || {
55 let couchbase = Couchbase::new(&client);
56
57 match couchbase.connect(&config) {
58 Ok(..) => log::info!("stream started"),
59 Err(e) => log::error!("stream cannot started: {}", e),
60 }
61 });
62
63 self.client.start()
64 }
65
66 pub fn stop(&self) -> io::Result<()> {
67 self.client.stop();
68 Ok(())
69 }
70}