changes_stream2/
lib.rs

1//! The `changes-stream` crate is designed to give you a
2//! futures::Stream of CouchDB changes stream events.
3
4use bytes::{buf::IntoIter, Bytes};
5use futures_util::stream::Stream;
6use log::error;
7use serde_json::Value;
8use std::{mem::replace, pin::Pin, task::Poll, time::Duration};
9
10mod error;
11mod event;
12pub use error::Error;
13pub use event::{Change, ChangeEvent, Event, FinishedEvent};
14
15/// A structure which implements futures::Stream
16pub struct ChangesStream {
17    /// metrics bytes counter
18    #[cfg(feature = "metrics")]
19    bytes: metrics::Counter,
20    /// metrics entries counter
21    #[cfg(feature = "metrics")]
22    entries: metrics::Counter,
23    /// Source of http chunks provided by reqwest
24    source: Pin<Box<dyn Stream<Item = reqwest::Result<Bytes>> + Send>>,
25    /// Buffer of current line and current chunk iterator
26    buf: (Vec<u8>, Option<IntoIter<Bytes>>),
27}
28
29impl ChangesStream {
30    /// Constructs a new `ChangesStream` struct with a post payload
31    ///
32    /// Takes a single argument, `db`, which represents the
33    /// url of the data you wish to stream.
34    ///
35    /// For example, to create a new `ChangesStream` struct
36    /// for the npmjs registry, you would write:
37    ///
38    /// ```no_run
39    /// # use changes_stream2::{ChangesStream, Event};
40    /// # use futures_util::stream::StreamExt;
41    /// #
42    /// # #[tokio::main]
43    /// # async fn main() {
44    /// #     let url = "https://replicate.npmjs.com/_changes?filter=_selector".to_string();
45    /// #     let mut changes = ChangesStream::with_post_payload(url, &serde_json::json!({"selector": { "_id": { "$regex": "^_design/" }}})).await.unwrap();
46    /// #     while let Some(event) = changes.next().await {
47    /// #         match event {
48    /// #             Ok(Event::Change(change)) => println!("Change ({}): {}", change.seq, change.id),
49    /// #             Ok(Event::Finished(finished)) => println!("Finished: {}", finished.last_seq),
50    /// #             Err(err) => println!("Error: {:?}", err),
51    /// #         }
52    /// #     }
53    /// # }
54    /// ```
55    pub async fn with_post_payload(db: String, payload: &Value) -> Result<Self, Error> {
56        let url = url::Url::parse(&db)?;
57        #[cfg(feature = "metrics")]
58        let database = regex::Regex::new(r"(?m)[_/]+")
59            .unwrap()
60            .replace_all(
61                &format!("{}_{}", url.host_str().unwrap_or_default(), url.path()),
62                "_",
63            )
64            .to_string();
65
66        let client = reqwest::ClientBuilder::new()
67            .connect_timeout(Duration::from_secs(60))
68            .read_timeout(Duration::from_secs(60))
69            .build()?;
70        let res = client.post(url).json(payload).send().await?;
71        let status = res.status();
72        if !status.is_success() {
73            let body = res.text().await.unwrap_or_default();
74            return Err(Error::InvalidResponse { status, body });
75        }
76        let source = Pin::new(Box::new(res.bytes_stream()));
77
78        #[cfg(feature = "metrics")]
79        let (bytes, entries) = {
80            let bytes_name = "couchdb_changes_bytes_total";
81            let entries_name = "couchdb_changes_entries_total";
82            metrics::describe_counter!(bytes_name, metrics::Unit::Bytes, "Changes stream bytes");
83            metrics::describe_counter!(
84                entries_name,
85                metrics::Unit::Count,
86                "Changes stream entries"
87            );
88            (
89                metrics::counter!(bytes_name, "database" => database.clone()),
90                metrics::counter!(entries_name, "database" => database),
91            )
92        };
93
94        Ok(Self {
95            #[cfg(feature = "metrics")]
96            bytes,
97            #[cfg(feature = "metrics")]
98            entries,
99            source,
100            buf: (Vec::new(), None),
101        })
102    }
103
104    /// Constructs a new `ChangesStream` struct
105    ///
106    /// Takes a single argument, `db`, which represents the
107    /// url of the data you wish to stream.
108    ///
109    /// For example, to create a new `ChangesStream` struct
110    /// for the npmjs registry, you would write:
111    ///
112    /// ```no_run
113    /// # use changes_stream2::{ChangesStream, Event};
114    /// # use futures_util::stream::StreamExt;
115    /// #
116    /// # #[tokio::main]
117    /// # async fn main() {
118    /// #     let url = "https://replicate.npmjs.com/_changes".to_string();
119    /// #     let mut changes = ChangesStream::new(url).await.unwrap();
120    /// #     while let Some(event) = changes.next().await {
121    /// #         match event {
122    /// #             Ok(Event::Change(change)) => println!("Change ({}): {}", change.seq, change.id),
123    /// #             Ok(Event::Finished(finished)) => println!("Finished: {}", finished.last_seq),
124    /// #             Err(err) => println!("Error: {:?}", err),
125    /// #         }
126    /// #     }
127    /// # }
128    /// ```
129    pub async fn new(db: String) -> Result<Self, Error> {
130        Self::with_post_payload(db, &serde_json::json!({})).await
131    }
132}
133
134impl Stream for ChangesStream {
135    type Item = Result<Event, Error>;
136
137    fn poll_next(
138        mut self: std::pin::Pin<&mut Self>,
139        cx: &mut std::task::Context<'_>,
140    ) -> std::task::Poll<Option<Self::Item>> {
141        'main: loop {
142            if self.buf.1.is_none() {
143                match Stream::poll_next(self.source.as_mut(), cx) {
144                    Poll::Pending => return Poll::Pending,
145                    Poll::Ready(None) => return Poll::Ready(None),
146                    Poll::Ready(Some(Ok(chunk))) => {
147                        #[cfg(feature = "metrics")]
148                        self.bytes.increment(chunk.len() as u64);
149
150                        self.buf.1 = Some(chunk.into_iter())
151                    }
152                    Poll::Ready(Some(Err(err))) => {
153                        error!("Error getting next chunk: {:?}", err);
154                        return Poll::Ready(None);
155                    }
156                }
157            } else {
158                let (line, chunk_iter) = &mut self.buf;
159                let iter = chunk_iter.as_mut().unwrap();
160
161                loop {
162                    if let Some(byte) = iter.next() {
163                        if byte == 0x0A {
164                            // Found '\n', end of line
165                            break;
166                        }
167                        line.push(byte);
168                    } else {
169                        // We need another chunk to fill the line
170                        *chunk_iter = None;
171                        continue 'main;
172                    }
173                }
174
175                let line = replace(line, Vec::with_capacity(line.len() * 2));
176                if line.len() < 14 {
177                    // skip prologue, epilogue and empty lines (continuous mode)
178                    continue;
179                }
180
181                let mut len = line.len();
182                if line[len - 1] == 0x0D {
183                    // 0x0D is '\r'. CouchDB >= 2.0 sends "\r\n"
184                    len -= 1;
185                }
186                if line[len - 1] == 0x2C {
187                    // 0x2C is ','
188                    len -= 1;
189                }
190
191                let result = serde_json::from_slice::<ChangeEvent>(&line[..len])
192                    .map(Event::Change)
193                    .or_else(|error| {
194                        serde_json::from_slice::<FinishedEvent>(&line[..len])
195                            .map(Event::Finished)
196                            .map_err(|_err| Error::ParsingFailed {
197                                error,
198                                json: String::from_utf8(line).unwrap_or_default(),
199                            })
200                    });
201
202                #[cfg(feature = "metrics")]
203                self.entries.increment(1);
204
205                return Poll::Ready(Some(result));
206            }
207        }
208    }
209}