1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
//! Turn a query into a changefeed, an infinite stream of objects
//! representing changes to the query's results as they occur
//!
//! A changefeed may return changes to a table or an individual document
//! (a "point" changefeed). Commands such as `filter` or `map` may be used
//! before the `changes` command to transform or filter the output, and
//! many commands that operate on sequences can be chained after `changes`.
//!
//! If the table becomes unavailable, the changefeed will be disconnected,
//! and a runtime exception will be thrown by the driver.
//!
//! Changefeed notifications take the form of a two-field object:
//!
//! ```js
//! {
//! "old_val": <document before change>,
//! "new_val": <document after change>
//! }
//! ```
//!
//! When `include_types` is `true`, there will be three fields:
//!
//! ```js
//! {
//! "old_val": <document before change>,
//! "new_val": <document after change>,
//! "type": <result type>
//! }
//! ```
//!
//! When a document is deleted, `new_val` will be `null`; when a document is
//! inserted, `old_val` will be `null`.
//!
//! Certain document transformation commands can be chained before changefeeds.
//! For more information, read the [discussion of changefeeds](https://rethinkdb.com/docs/changefeeds/)
//! in the "Command language" documentation.
//!
//! Changefeeds ignore the `read_mode` flag to `run`, and always behave as if
//! it is set to `single` (i.e., the values they return are in memory on the primary
//! replica, but have not necessarily been written to disk yet). For more details
//! read [Consistency guarantees](https://rethinkdb.com/docs/consistency).
//!
//! The server will buffer up to `changefeed_queue_size` elements (default 100,000).
//! If the buffer limit is hit, early changes will be discarded, and the client will
//! receive an object of the form
//! `{error: "Changefeed cache over array size limit, skipped X elements."}`
//! where `X` is the number of elements skipped.
//!
//! Commands that operate on streams (such as [filter](super::filter) or [map](super::map))
//! can usually be chained after `changes`. However, since the stream produced by
//! `changes` has no ending, commands that need to consume the entire stream before
//! returning (such as [reduce](super::reduce) or [count](super::count)) cannot.
//!
//! # Examples
//!
//! Subscribe to the changes on a table.
//!
//! Start monitoring the changefeed in one client:
//!
//! ```
//! # reql::example(|r, conn| async_stream::stream! {
//! r.table("games").changes(()).run(conn)
//! # });
//! ```
//!
//! As these queries are performed in a second client
//!
//! ```
//! # use serde_json::json;
//! # reql::example(|r, conn| async_stream::stream! {
//! r.table("games").insert(json!({"id": 1})).run(conn)
//! # });
//! ```
//!
//! the first client would receive and print the following objects:
//!
//! ```json
//! {old_val: null, new_val: {id: 1}}
//! ```
use crate::{cmd, Command};
use ql2::term::TermType;
use reql_macros::CommandOptions;
use serde::Serialize;
/// Optional arguments to `changes`
#[derive(Debug, Clone, Copy, CommandOptions, Serialize, Default, PartialEq, PartialOrd)]
#[non_exhaustive]
pub struct Options {
/// Controls how change notifications are batched
#[serde(skip_serializing_if = "Option::is_none")]
pub squash: Option<Squash>,
/// The number of changes the server will buffer between client reads
/// before it starts dropping changes and generates an error
/// (default: 100,000).
#[serde(skip_serializing_if = "Option::is_none")]
pub changefeed_queue_size: Option<u32>,
/// If `true`, the changefeed stream will begin with the current contents
/// of the table or selection being monitored. These initial results will
/// have `new_val` fields, but no `old_val` fields. The initial results
/// may be intermixed with actual changes, as long as an initial result
/// for the changed document has already been given. If an initial result
/// for a document has been sent and a change is made to that document
/// that would move it to the unsent part of the result set (e.g., a
/// changefeed monitors the top 100 posters, the first 50 have been sent,
/// and poster 48 has become poster 52), an "uninitial" notification will
/// be sent, with an `old_val` field but no `new_val` field.
#[serde(skip_serializing_if = "Option::is_none")]
pub include_initial: Option<bool>,
/// If `true`, the changefeed stream will include special status documents
/// consisting of the field `state` and a string indicating a change in the
/// feed's state. These documents can occur at any point in the feed between
/// the notification documents described below. If `includeStates` is `false`
/// (the default), the status documents will not be sent.
#[serde(skip_serializing_if = "Option::is_none")]
pub include_states: Option<bool>,
/// If `true`, a changefeed stream on an `order_by.limit` changefeed will
/// include `old_offset` and `new_offset` fields in status documents that
/// include `old_val` and `new_val`. This allows applications to maintain
/// ordered lists of the stream's result set. If `old_offset` is set and not
/// `null`, the element at `old_offset` is being deleted; if `new_offset` is
/// set and not `null`, then `new_val` is being inserted at `new_offset`.
/// Setting `include_offsets` to `true` on a changefeed that does not support
/// it will raise an error.
#[serde(skip_serializing_if = "Option::is_none")]
pub include_offsets: Option<bool>,
/// If `true`, every result on a changefeed will include a `type` field with
/// a string that indicates the kind of change the result represents:
/// `add`, `remove`, `change`, `initial`, `uninitial`, `state`.
/// Defaults to `false`.
///
/// There are currently two states:
///
/// * `{state: 'initializing'}` indicates the following documents represent
/// initial values on the feed rather than changes. This will be the first
/// document of a feed that returns initial values.
/// * `{state: 'ready'}` indicates the following documents represent changes.
/// This will be the first document of a feed that does *not* return initial
/// values; otherwise, it will indicate the initial values have all been sent.
#[serde(skip_serializing_if = "Option::is_none")]
pub include_types: Option<bool>,
}
/// Controls how change notifications are batched
#[derive(Debug, Clone, Copy, Serialize, PartialEq, PartialOrd)]
#[non_exhaustive]
#[serde(untagged)]
pub enum Squash {
/// `true`: When multiple changes to the same document occur before a
/// batch of notifications is sent, the changes are "squashed" into one
/// change. The client receives a notification that will bring it fully
/// up to date with the server.
/// `false`: All changes will be sent to the client verbatim. This is
/// the default.
Bool(bool),
/// `n`: A numeric value (floating point). Similar to `true`, but the
/// server will wait `n` seconds to respond in order to squash as many
/// changes together as possible, reducing network traffic. The first
/// batch will always be returned immediately.
Float(f32),
}
pub trait Arg {
fn arg(self) -> cmd::Arg<Options>;
}
impl Arg for () {
fn arg(self) -> cmd::Arg<Options> {
Command::new(TermType::Changes)
.mark_change_feed()
.into_arg()
}
}
impl Arg for Options {
fn arg(self) -> cmd::Arg<Options> {
().arg().with_opts(self)
}
}