1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Parity.

// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Parity.  If not, see <http://www.gnu.org/licenses/>.

//! A key-value database for use in browsers
//!
//! Writes data both into memory and IndexedDB, reads the whole database in memory
//! from the IndexedDB on `open`.

#![deny(missing_docs)]

mod error;
mod indexed_db;

use std::io;
use std::rc::Rc;
use std::sync::Mutex;
use kvdb::{DBValue, DBTransaction};
use kvdb_memorydb::{InMemory, self as in_memory};
use send_wrapper::SendWrapper;

pub use error::Error;
pub use kvdb::KeyValueDB;

use futures::prelude::*;

use web_sys::IdbDatabase;

/// Database backed by both IndexedDB and in memory implementation.
pub struct Database {
	name: String,
	version: u32,
	columns: u32,
	in_memory: InMemory,
	indexed_db: Mutex<SendWrapper<IdbDatabase>>,
}

// The default column is represented as `None`.
type Column = Option<u32>;

fn number_to_column(col: u32) -> Column {
	col.checked_sub(1)
}


impl Database {
	/// Opens the database with the given name,
	/// and the specified number of columns (not including the default one).
	pub fn open(name: String, columns: u32) -> impl Future<Output = Result<Database, error::Error>> {
		// let's try to open the latest version of the db first
		let open_request = indexed_db::open(name.as_str(), None, columns);
		let name_clone = name.clone();
		open_request.then(move |db| {
			let db = match db {
				Ok(db) => db,
				Err(err) => return future::Either::Right(future::err(err)),
			};

			// If we need more column than the latest version has,
			// then bump the version (+ 1 for the default column).
			// In order to bump the version, we close the database
			// and reopen it with a higher version than it was opened with previously.
			// cf. https://github.com/paritytech/parity-common/pull/202#discussion_r321221751
			if columns + 1 > db.columns {
				let next_version = db.version + 1;
				drop(db);
				future::Either::Left(indexed_db::open(name.as_str(), Some(next_version), columns).boxed())
			} else {
				future::Either::Left(future::ok(db).boxed())
			}
		// populate the in_memory db from the IndexedDB
		}).then(move |db| {
			let db = match db {
				Ok(db) => db,
				Err(err) => return future::Either::Right(future::err(err)),
			};

			let indexed_db::IndexedDB { version, inner, .. } = db;
			let rc = Rc::new(inner.take());
			let weak = Rc::downgrade(&rc);
			// read the columns from the IndexedDB
			future::Either::Left(stream::iter(0..=columns).map(move |n| {
				let db = weak.upgrade().expect("rc should live at least as long; qed");
				indexed_db::idb_cursor(&db, n).fold(DBTransaction::new(), move |mut txn, (key, value)| {
					let column = number_to_column(n);
					txn.put_vec(column, key.as_ref(), value);
					future::ready(txn)
				})
			// write each column into memory
			}).fold(in_memory::create(columns), |m, txn| {
				txn.then(|txn| {
					m.write_buffered(txn);
					future::ready(m)
				})
			}).then(move |in_memory| future::ok(Database {
				name: name_clone,
				version,
				columns,
				in_memory,
				indexed_db: Mutex::new(SendWrapper::new(
					Rc::try_unwrap(rc).expect("should have only 1 ref at this point; qed")
				)),
			})))
		})
	}

	/// Get the database name.
	pub fn name(&self) -> &str {
		self.name.as_str()
	}

	/// Get the database version.
	pub fn version(&self) -> u32 {
		self.version
	}
}

impl Drop for Database {
	fn drop(&mut self) {
		if let Ok(db) = self.indexed_db.lock() {
			db.close();
		}
	}
}

impl KeyValueDB for Database {
	fn get(&self, col: Option<u32>, key: &[u8]) -> io::Result<Option<DBValue>> {
		self.in_memory.get(col, key)
	}

	fn get_by_prefix(&self, col: Option<u32>, prefix: &[u8]) -> Option<Box<[u8]>> {
		self.in_memory.get_by_prefix(col, prefix)
	}

	fn write_buffered(&self, transaction: DBTransaction) {
		if let Ok(guard) = self.indexed_db.lock() {
			let _ = indexed_db::idb_commit_transaction(&*guard, &transaction, self.columns);
		}
		self.in_memory.write_buffered(transaction);
	}

	fn flush(&self) -> io::Result<()> {
		Ok(())
	}

	// NOTE: clones the whole db
	fn iter<'a>(&'a self, col: Option<u32>) -> Box<dyn Iterator<Item=(Box<[u8]>, Box<[u8]>)> + 'a> {
		self.in_memory.iter(col)
	}

	// NOTE: clones the whole db
	fn iter_from_prefix<'a>(&'a self, col: Option<u32>, prefix: &'a [u8])
		-> Box<dyn Iterator<Item=(Box<[u8]>, Box<[u8]>)> + 'a>
	{
		self.in_memory.iter_from_prefix(col, prefix)
	}

	// NOTE: not supported
	fn restore(&self, _new_db: &str) -> std::io::Result<()> {
		Err(io::Error::new(io::ErrorKind::Other, "Not supported yet"))
	}
}