changes_stream2/
lib.rs

1//! The `changes-stream` crate is designed to give you a
2//! futures::Stream of CouchDB changes stream events.
3
4use bytes::{buf::IntoIter, Bytes};
5use futures_util::stream::Stream;
6use log::error;
7use serde_json::Value;
8use std::{mem::replace, pin::Pin, task::Poll};
9
10mod error;
11mod event;
12pub use error::Error;
13pub use event::{Change, ChangeEvent, Event, FinishedEvent};
14
15/// A structure which implements futures::Stream
16pub struct ChangesStream {
17    /// metrics bytes counter
18    #[cfg(feature = "metrics")]
19    bytes: metrics::Counter,
20    /// metrics entries counter
21    #[cfg(feature = "metrics")]
22    entries: metrics::Counter,
23    /// Source of http chunks provided by reqwest
24    source: Pin<Box<dyn Stream<Item = reqwest::Result<Bytes>> + Send>>,
25    /// Buffer of current line and current chunk iterator
26    buf: (Vec<u8>, Option<IntoIter<Bytes>>),
27}
28
29impl ChangesStream {
30    /// Constructs a new `ChangesStream` struct with a post payload
31    ///
32    /// Takes a single argument, `db`, which represents the
33    /// url of the data you wish to stream.
34    ///
35    /// For example, to create a new `ChangesStream` struct
36    /// for the npmjs registry, you would write:
37    ///
38    /// ```no_run
39    /// # use changes_stream2::{ChangesStream, Event};
40    /// # use futures_util::stream::StreamExt;
41    /// #
42    /// # #[tokio::main]
43    /// # async fn main() {
44    /// #     let url = "https://replicate.npmjs.com/_changes?filter=_selector".to_string();
45    /// #     let mut changes = ChangesStream::with_post_payload(url, &serde_json::json!({"selector": { "_id": { "$regex": "^_design/" }}})).await.unwrap();
46    /// #     while let Some(event) = changes.next().await {
47    /// #         match event {
48    /// #             Ok(Event::Change(change)) => println!("Change ({}): {}", change.seq, change.id),
49    /// #             Ok(Event::Finished(finished)) => println!("Finished: {}", finished.last_seq),
50    /// #             Err(err) => println!("Error: {:?}", err),
51    /// #         }
52    /// #     }
53    /// # }
54    /// ```
55    pub async fn with_post_payload(db: String, payload: &Value) -> Result<Self, Error> {
56        let url = url::Url::parse(&db)?;
57        #[cfg(feature = "metrics")]
58        let database = regex::Regex::new(r"(?m)[_/]+")
59            .unwrap()
60            .replace_all(
61                &format!("{}_{}", url.host_str().unwrap_or_default(), url.path()),
62                "_",
63            )
64            .to_string();
65
66        let client = reqwest::Client::new();
67        let res = client.post(url).json(payload).send().await?;
68        let status = res.status();
69        if !status.is_success() {
70            let body = res.text().await.unwrap_or_default();
71            return Err(Error::InvalidResponse { status, body });
72        }
73        let source = Pin::new(Box::new(res.bytes_stream()));
74
75        #[cfg(feature = "metrics")]
76        let (bytes, entries) = {
77            let bytes_name = "couchdb_changes_bytes_total";
78            let entries_name = "couchdb_changes_entries_total";
79            metrics::describe_counter!(bytes_name, metrics::Unit::Bytes, "Changes stream bytes");
80            metrics::describe_counter!(
81                entries_name,
82                metrics::Unit::Count,
83                "Changes stream entries"
84            );
85            (
86                metrics::counter!(bytes_name, "database" => database.clone()),
87                metrics::counter!(entries_name, "database" => database),
88            )
89        };
90
91        Ok(Self {
92            #[cfg(feature = "metrics")]
93            bytes,
94            #[cfg(feature = "metrics")]
95            entries,
96            source,
97            buf: (Vec::new(), None),
98        })
99    }
100
101    /// Constructs a new `ChangesStream` struct
102    ///
103    /// Takes a single argument, `db`, which represents the
104    /// url of the data you wish to stream.
105    ///
106    /// For example, to create a new `ChangesStream` struct
107    /// for the npmjs registry, you would write:
108    ///
109    /// ```no_run
110    /// # use changes_stream2::{ChangesStream, Event};
111    /// # use futures_util::stream::StreamExt;
112    /// #
113    /// # #[tokio::main]
114    /// # async fn main() {
115    /// #     let url = "https://replicate.npmjs.com/_changes".to_string();
116    /// #     let mut changes = ChangesStream::new(url).await.unwrap();
117    /// #     while let Some(event) = changes.next().await {
118    /// #         match event {
119    /// #             Ok(Event::Change(change)) => println!("Change ({}): {}", change.seq, change.id),
120    /// #             Ok(Event::Finished(finished)) => println!("Finished: {}", finished.last_seq),
121    /// #             Err(err) => println!("Error: {:?}", err),
122    /// #         }
123    /// #     }
124    /// # }
125    /// ```
126    pub async fn new(db: String) -> Result<Self, Error> {
127        Self::with_post_payload(db, &serde_json::json!({})).await
128    }
129}
130
131impl Stream for ChangesStream {
132    type Item = Result<Event, Error>;
133
134    fn poll_next(
135        mut self: std::pin::Pin<&mut Self>,
136        cx: &mut std::task::Context<'_>,
137    ) -> std::task::Poll<Option<Self::Item>> {
138        'main: loop {
139            if self.buf.1.is_none() {
140                match Stream::poll_next(self.source.as_mut(), cx) {
141                    Poll::Pending => return Poll::Pending,
142                    Poll::Ready(None) => return Poll::Ready(None),
143                    Poll::Ready(Some(Ok(chunk))) => {
144                        #[cfg(feature = "metrics")]
145                        self.bytes.increment(chunk.len() as u64);
146
147                        self.buf.1 = Some(chunk.into_iter())
148                    }
149                    Poll::Ready(Some(Err(err))) => {
150                        error!("Error getting next chunk: {:?}", err);
151                        return Poll::Ready(None);
152                    }
153                }
154            } else {
155                let (line, chunk_iter) = &mut self.buf;
156                let iter = chunk_iter.as_mut().unwrap();
157
158                loop {
159                    if let Some(byte) = iter.next() {
160                        if byte == 0x0A {
161                            // Found '\n', end of line
162                            break;
163                        }
164                        line.push(byte);
165                    } else {
166                        // We need another chunk to fill the line
167                        *chunk_iter = None;
168                        continue 'main;
169                    }
170                }
171
172                let line = replace(line, Vec::with_capacity(line.len() * 2));
173                if line.len() < 14 {
174                    // skip prologue, epilogue and empty lines (continuous mode)
175                    continue;
176                }
177
178                let mut len = line.len();
179                if line[len - 1] == 0x0D {
180                    // 0x0D is '\r'. CouchDB >= 2.0 sends "\r\n"
181                    len -= 1;
182                }
183                if line[len - 1] == 0x2C {
184                    // 0x2C is ','
185                    len -= 1;
186                }
187
188                let result = serde_json::from_slice::<ChangeEvent>(&line[..len])
189                    .map(Event::Change)
190                    .or_else(|error| {
191                        serde_json::from_slice::<FinishedEvent>(&line[..len])
192                            .map(Event::Finished)
193                            .map_err(|_err| Error::ParsingFailed {
194                                error,
195                                json: String::from_utf8(line).unwrap_or_default(),
196                            })
197                    });
198
199                #[cfg(feature = "metrics")]
200                self.entries.increment(1);
201
202                return Poll::Ready(Some(result));
203            }
204        }
205    }
206}