1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
//! This create contains client bindings for [Noria](https://github.com/mit-pdos/noria).
//!
//! # What is Noria?
//!
//! Noria is a new streaming data-flow system designed to act as a fast storage backend for
//! read-heavy web applications based on [this paper](https://jon.tsp.io/papers/osdi18-noria.pdf)
//! from [OSDI'18](https://www.usenix.org/conference/osdi18/presentation/gjengset). It acts like a
//! databases, but pre-computes and caches relational query results so that reads are blazingly
//! fast. Noria automatically keeps cached results up-to-date as the underlying data, stored in
//! persistent _base tables_ change. Noria uses partially-stateful data-flow to reduce memory
//! overhead, and supports dynamic, runtime data-flow and query change.
//!
//! # Infrastructure
//!
//! Like most databases, Noria follows a server-client model where many clients connect to a
//! (potentially distributed) server. The server in this case is the `noria-server`
//! binary, and must be started before clients can connect. See `noria-server --help` for details
//! and the [Noria repository README](https://github.com/mit-pdos/noria) for details. Noria uses
//! [Apache ZooKeeper](https://zookeeper.apache.org/) to announce the location of its servers, so
//! ZooKeeper must also be running.
//!
//! # Quickstart example
//!
//! If you just want to get up and running quickly, here's some code to dig into. Note that this
//! requires a nightly release of Rust to run for the time being.
//!
//! ```no_run
//! # use noria::*;
//! #[tokio::main]
//! async fn main() {
//!     let zookeeper_addr = "127.0.0.1:2181";
//!     let mut db = ControllerHandle::from_zk(zookeeper_addr).await.unwrap();
//!
//!     // if this is the first time we interact with Noria, we must give it the schema
//!     db.install_recipe("
//!         CREATE TABLE Article (aid int, title varchar(255), url text, PRIMARY KEY(aid));
//!         CREATE TABLE Vote (aid int, uid int);
//!     ").await.unwrap();
//!
//!     // we can then get handles that let us insert into the new tables
//!     let mut article = db.table("Article").await.unwrap();
//!     let mut vote = db.table("Vote").await.unwrap();
//!
//!     // let's make a new article
//!     let aid = 42;
//!     let title = "I love Soup";
//!     let url = "https://pdos.csail.mit.edu";
//!     article
//!         .insert(vec![aid.into(), title.into(), url.into()])
//!         .await
//!         .unwrap();
//!
//!     // and then vote for it
//!     vote.insert(vec![aid.into(), 1.into()]).await.unwrap();
//!
//!     // we can also declare views that we want want to query
//!     db.extend_recipe("
//!         VoteCount: \
//!           SELECT Vote.aid, COUNT(uid) AS votes \
//!           FROM Vote GROUP BY Vote.aid;
//!         QUERY ArticleWithVoteCount: \
//!           SELECT Article.aid, title, url, VoteCount.votes AS votes \
//!           FROM Article LEFT JOIN VoteCount ON (Article.aid = VoteCount.aid) \
//!           WHERE Article.aid = ?;").await.unwrap();
//!
//!     // and then get handles that let us execute those queries to fetch their results
//!     let mut awvc = db.view("ArticleWithVoteCount").await.unwrap();
//!     // looking up article 42 should yield the article we inserted with a vote count of 1
//!     assert_eq!(
//!         awvc.lookup(&[aid.into()], true).await.unwrap(),
//!         vec![vec![DataType::from(aid), title.into(), url.into(), 1.into()]]
//!     );
//! }
//! ```
//!
//! # Client model
//!
//! Noria accepts a set of parameterized SQL queries (think [prepared
//! statements](https://en.wikipedia.org/wiki/Prepared_statement)), and produces a [data-flow
//! program](https://en.wikipedia.org/wiki/Stream_processing) that maintains [materialized
//! views](https://en.wikipedia.org/wiki/Materialized_view) for the output of those queries. Reads
//! now become fast lookups directly into these materialized views, as if the value had been
//! directly read from a cache (like memcached). The views are automatically kept up-to-date by
//! Noria through the data-flow.
//!
//! Reads work quite differently in Noria compared to traditional relational databases. In
//! particular, a query, or _view_, must be _registered_ before it can be executed, much like SQL
//! prepared statements. Use [`ControllerHandle::extend_recipe`] to register new base tables and
//! views. Once a view has been registered, you can get a handle that lets you execute the
//! corresponding query by passing the view's name to [`ControllerHandle::view`]. The returned
//! [`View`] can be used to query the view with different values for its declared parameters
//! (values in place of `?` in the query) through [`View::lookup`] and [`View::multi_lookup`].
//!
//! Writes are fairly similar to those in relational databases. To add a new table, you extend the
//! recipe (using [`ControllerHandle::extend_recipe`]) with a `CREATE TABLE` statement, and then
//! use [`ControllerHandle::table`] to get a handle to the new base table. Base tables support
//! similar operations as SQL tables, such as [`Table::insert`], [`Table::update`],
//! [`Table::delete`], and also more esoteric operations like [`Table::insert_or_update`].
//!
//! # Alternatives
//!
//! Noria provides a [MySQL adapter](https://github.com/mit-pdos/noria-mysql) that implements the
//! binary MySQL protocol, which provides a compatibility layer for applications that wish to
//! continue to issue ad-hoc MySQL queries through existing MySQL client libraries.
#![feature(type_alias_impl_trait)]
#![deny(missing_docs)]
#![deny(unused_extern_crates)]
#![deny(unreachable_pub)]
#![warn(rust_2018_idioms)]

#[macro_use]
extern crate failure;
#[macro_use]
extern crate serde_derive;
#[macro_use]
extern crate slog;

/// Maximum number of requests that may be in-flight _to_ the connection pool at a time.
///
/// We want this to be > 1 so that multiple threads can enqueue requests at the same time without
/// immediately blocking one another. The exact value is somewhat arbitrary.
///
/// The value isn't higher, because it would unnecessarily consume memory.
///
/// The value isn't lower, because it would mean fewer concurrent enqueues.
///
/// NOTE: This value also places a soft-ish limit on the number of instances you can have of
/// `View`s or `Table`s that include a particular endpoint address when sharding is enabled. The
/// reason for this is kind of subtle: when you `poll_ready` a `View` or `Table`, we internally
/// `poll_ready` all the shards of that `View` or `Table`, each of which is a `tower-buffer`. When
/// you `poll_ready` a `tower-buffer`, it reserves a "slot" in its buffer for the coming request,
/// effectively reducing the capacity of the channel by 1 until the send happens. But, with
/// sharding, it may be that no request then goes to a particular shard. So, you may end up with
/// *all* the slots to a given resource taken up by `poll_ready`s that haven't been used yet. A
/// similar issue arises if you ever do:
///
/// ```ignore
/// view.ready().await;
/// let req = reqs.next().await;
/// view.call(req).await;
/// ```
///
/// When `ready` resolves, it will be holding up a slot in the buffer to `view`. It will continue
/// to hold that up all the way until the next request arrives from `reqs`, which may be a very
/// long time!
///
/// This problem is also described inhttps://github.com/tower-rs/tower/pull/425 and
/// https://github.com/tower-rs/tower/issues/408#issuecomment-593678194. Ultimately, we need
/// something like https://github.com/tower-rs/tower/issues/408, but for the time being, just make
/// sure this value is high enough.
pub(crate) const BUFFER_TO_POOL: usize = 1024;

/// The number of concurrent connections to a given backend table.
///
/// Since Noria connections are multiplexing, having this value > 1 _only_ allows us to do
/// serialization/deserialization in parallel on multiple threads. Nothing else really.
///
/// The value isn't higher for a couple of reasons:
///
///  - It is per table, which means it is per shard of a domain. Unless _all_ of your requests go
///    to a single shard of one table, you should be fine.
///  - Table operations are generally not bottlenecked on serialization, but on committing.
///
/// The value isn't lower, because we want _some_ concurrency in serialization.
pub(crate) const TABLE_POOL_SIZE: usize = 2;

/// The number of concurrent connections to a given backend view.
///
/// Since Noria connections are multiplexing, having this value > 1 _only_ allows us to do
/// serialization/deserialization in parallel on multiple threads. Nothing else really.
///
/// This value is set higher than the max pool size for tables for a couple of reasons:
///
///  - View connections are made per _host_. So, if you query multiple views that happen to be
///    hosted by a single machine (such as if there is only one Noria worker), this is the total
///    amount of serialization concurrency you will get.
///  - Reads are generally bottlenecked on serialization, so devoting more resources to it seems
///    reasonable.
///
/// The value isn't higher because we, _and the server_ only have so many cores.
pub(crate) const VIEW_POOL_SIZE: usize = 16;

/// Number of requests that can be pending to any particular target.
///
/// Keep in mind that this is really the number of requests that can be pending to any given shard
/// of a domain (for tables) or to any given Noria worker (for views). The value should arguably be
/// higher for views than for tables, since views are more likely to share a connection than
/// tables, but since this is really just a measure for "are we falling over", it can sort of be
/// arbitrarily high. If the system isn't keeping up, then it will fill up regardless, it'll just
/// take longer.
///
/// We need to limit this since `AsyncBincode` has unlimited buffering, and so will never apply
/// back-pressure otherwise. The backpressure is necessary so that the pool will eventually know
/// that another connection should be established to help with serialization/deserialization.
///
/// The value isn't higher, because it would mean we just allow more data to be buffered internally
/// in the system before we exhert backpressure.
///
/// The value isn't lower, because that give the server less work at a time, which means it can
/// batch less work, which means lower overall efficiency.
pub(crate) const PENDING_LIMIT: usize = 2048;

use petgraph::graph::NodeIndex;
use std::collections::HashMap;
use tokio_tower::multiplex;

mod controller;
mod data;
mod table;
mod view;

#[doc(hidden)]
pub mod channel;
#[doc(hidden)]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub mod consensus;
#[doc(hidden)]
pub mod doc_mock;
#[doc(hidden)]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub mod internal;

// for the row! macro
#[doc(hidden)]
pub use nom_sql::ColumnConstraint;

pub use crate::consensus::ZookeeperAuthority;
use crate::internal::*;
use std::future::Future;
use std::pin::Pin;
use tokio::task_local;

/// The prelude contains most of the types needed in everyday operation.
pub mod prelude {
    pub use super::ActivationResult;
    pub use super::ControllerHandle;
    pub use super::Table;
    pub use super::View;
}

/// Wrapper types for Noria query results.
pub mod results {
    pub use super::view::results::{ResultRow, Results, Row};
}

/// Noria errors.
pub mod error {
    pub use crate::table::TableError;
    pub use crate::view::ViewError;
}

task_local! {
    static TRACE_NEXT: ();
}

fn trace_next_op() -> bool {
    TRACE_NEXT.try_with(|_| true).unwrap_or(false)
}

/// The next Noria read or write issued from the current thread will be traced using tokio-trace.
///
/// The trace output is visible by setting the environment variable `RUST_LOG=trace`.
pub async fn trace_ops_in<T>(f: impl Future<Output = T>) -> T {
    TRACE_NEXT.scope((), f).await
}

#[derive(Debug, Default)]
#[doc(hidden)]
// only pub because we use it to figure out the error type for ViewError
pub struct Tagger(slab::Slab<()>);

impl<Request, Response> multiplex::TagStore<Tagged<Request>, Tagged<Response>> for Tagger {
    type Tag = u32;

    fn assign_tag(mut self: Pin<&mut Self>, r: &mut Tagged<Request>) -> Self::Tag {
        r.tag = self.0.insert(()) as u32;
        r.tag
    }
    fn finish_tag(mut self: Pin<&mut Self>, r: &Tagged<Response>) -> Self::Tag {
        self.0.remove(r.tag as usize);
        r.tag
    }
}

#[doc(hidden)]
#[derive(Serialize, Deserialize, Debug)]
pub struct Tagged<T> {
    pub tag: u32,
    pub v: T,
}

impl<T> From<T> for Tagged<T> {
    fn from(t: T) -> Self {
        Tagged { tag: 0, v: t }
    }
}

pub use crate::controller::{ControllerDescriptor, ControllerHandle};
pub use crate::data::{DataType, Modification, Operation, TableOperation};
pub use crate::table::Table;
pub use crate::view::View;

#[doc(hidden)]
pub use crate::table::Input;

#[doc(hidden)]
pub use crate::view::{ReadQuery, ReadReply};

#[doc(hidden)]
pub mod builders {
    pub use super::table::TableBuilder;
    pub use super::view::ViewBuilder;
}

/// Types used when debugging Noria.
pub mod debug;

/// Represents the result of a recipe activation.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct ActivationResult {
    /// Map of query names to `NodeIndex` handles for reads/writes.
    pub new_nodes: HashMap<String, NodeIndex>,
    /// List of leaf nodes that were removed.
    pub removed_leaves: Vec<NodeIndex>,
    /// Number of expressions the recipe added compared to the prior recipe.
    pub expressions_added: usize,
    /// Number of expressions the recipe removed compared to the prior recipe.
    pub expressions_removed: usize,
}

#[doc(hidden)]
#[inline]
pub fn shard_by(dt: &DataType, shards: usize) -> usize {
    match *dt {
        DataType::Int(n) => n as usize % shards,
        DataType::UnsignedInt(n) => n as usize % shards,
        DataType::BigInt(n) => n as usize % shards,
        DataType::UnsignedBigInt(n) => n as usize % shards,
        DataType::Text(..) | DataType::TinyText(..) => {
            use std::hash::Hasher;
            let mut hasher = ahash::AHasher::new_with_keys(0x3306, 0x6033);
            let s: &str = dt.into();
            hasher.write(s.as_bytes());
            hasher.finish() as usize % shards
        }
        // a bit hacky: send all NULL values to the first shard
        DataType::None => 0,
        ref x => {
            unimplemented!("asked to shard on value {:?}", x);
        }
    }
}