1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
use std::str::FromStr;
use std::sync::Arc;

use garage_net::NetworkKey;

use garage_db as db;

use garage_util::background::*;
use garage_util::config::*;
use garage_util::error::*;
use garage_util::persister::PersisterShared;

use garage_rpc::replication_mode::*;
use garage_rpc::system::System;

use garage_block::manager::*;
use garage_table::replication::TableFullReplication;
use garage_table::replication::TableShardedReplication;
use garage_table::*;

use crate::s3::block_ref_table::*;
use crate::s3::lifecycle_worker;
use crate::s3::mpu_table::*;
use crate::s3::object_table::*;
use crate::s3::version_table::*;

use crate::bucket_alias_table::*;
use crate::bucket_table::*;
use crate::helper;
use crate::index_counter::*;
use crate::key_table::*;

#[cfg(feature = "k2v")]
use crate::k2v::{item_table::*, rpc::*, sub::*};

/// An entire Garage full of data
pub struct Garage {
	/// The parsed configuration Garage is running
	pub config: Config,
	/// The set of background variables that can be viewed/modified at runtime
	pub bg_vars: vars::BgVars,

	/// The replication factor of this cluster
	pub replication_factor: ReplicationFactor,

	/// The local database
	pub db: db::Db,
	/// The membership manager
	pub system: Arc<System>,
	/// The block manager
	pub block_manager: Arc<BlockManager>,

	/// Table containing buckets
	pub bucket_table: Arc<Table<BucketTable, TableFullReplication>>,
	/// Table containing bucket aliases
	pub bucket_alias_table: Arc<Table<BucketAliasTable, TableFullReplication>>,
	/// Table containing api keys
	pub key_table: Arc<Table<KeyTable, TableFullReplication>>,

	/// Lock to prevent concurrent modification of buckets and access keys
	bucket_lock: tokio::sync::Mutex<()>,

	/// Table containing S3 objects
	pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>,
	/// Counting table containing object counters
	pub object_counter_table: Arc<IndexCounter<Object>>,
	/// Table containing S3 multipart uploads
	pub mpu_table: Arc<Table<MultipartUploadTable, TableShardedReplication>>,
	/// Counting table containing multipart object counters
	pub mpu_counter_table: Arc<IndexCounter<MultipartUpload>>,
	/// Table containing S3 object versions
	pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
	/// Table containing S3 block references (not blocks themselves)
	pub block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>,

	/// Persister for lifecycle worker info
	pub lifecycle_persister: PersisterShared<lifecycle_worker::LifecycleWorkerPersisted>,

	#[cfg(feature = "k2v")]
	pub k2v: GarageK2V,
}

#[cfg(feature = "k2v")]
pub struct GarageK2V {
	/// Table containing K2V items
	pub item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
	/// Indexing table containing K2V item counters
	pub counter_table: Arc<IndexCounter<K2VItem>>,
	/// K2V RPC handler
	pub rpc: Arc<K2VRpcHandler>,
}

impl Garage {
	/// Create and run garage
	pub fn new(config: Config) -> Result<Arc<Self>, Error> {
		// Create meta dir and data dir if they don't exist already
		std::fs::create_dir_all(&config.metadata_dir)
			.ok_or_message("Unable to create Garage metadata directory")?;
		match &config.data_dir {
			DataDirEnum::Single(data_dir) => {
				std::fs::create_dir_all(data_dir).ok_or_message(format!(
					"Unable to create Garage data directory: {}",
					data_dir.to_string_lossy()
				))?;
			}
			DataDirEnum::Multiple(data_dirs) => {
				for dir in data_dirs {
					std::fs::create_dir_all(&dir.path).ok_or_message(format!(
						"Unable to create Garage data directory: {}",
						dir.path.to_string_lossy()
					))?;
				}
			}
		}

		info!("Opening database...");
		let db_engine = db::Engine::from_str(&config.db_engine)
			.ok_or_message("Invalid `db_engine` value in configuration file")?;
		let mut db_path = config.metadata_dir.clone();
		match db_engine {
			db::Engine::Sqlite => {
				db_path.push("db.sqlite");
			}
			db::Engine::Lmdb => {
				db_path.push("db.lmdb");
			}
		}
		let db_opt = db::OpenOpt {
			fsync: config.metadata_fsync,
			lmdb_map_size: match config.lmdb_map_size {
				v if v == usize::default() => None,
				v => Some(v),
			},
		};
		let db = db::open_db(&db_path, db_engine, &db_opt)
			.ok_or_message("Unable to open metadata db")?;

		info!("Initializing RPC...");
		let network_key = hex::decode(config.rpc_secret.as_ref().ok_or_message(
			"rpc_secret value is missing, not present in config file or in environment",
		)?)
		.ok()
		.and_then(|x| NetworkKey::from_slice(&x))
		.ok_or_message("Invalid RPC secret key")?;

		let (replication_factor, consistency_mode) = parse_replication_mode(&config)?;

		info!("Initialize background variable system...");
		let mut bg_vars = vars::BgVars::new();

		info!("Initialize membership management system...");
		let system = System::new(network_key, replication_factor, consistency_mode, &config)?;

		let data_rep_param = TableShardedReplication {
			system: system.clone(),
			replication_factor: replication_factor.into(),
			write_quorum: replication_factor.write_quorum(consistency_mode),
			read_quorum: 1,
		};

		let meta_rep_param = TableShardedReplication {
			system: system.clone(),
			replication_factor: replication_factor.into(),
			write_quorum: replication_factor.write_quorum(consistency_mode),
			read_quorum: replication_factor.read_quorum(consistency_mode),
		};

		let control_rep_param = TableFullReplication {
			system: system.clone(),
		};

		info!("Initialize block manager...");
		let block_manager = BlockManager::new(&db, &config, data_rep_param, system.clone())?;
		block_manager.register_bg_vars(&mut bg_vars);

		// ---- admin tables ----
		info!("Initialize bucket_table...");
		let bucket_table = Table::new(BucketTable, control_rep_param.clone(), system.clone(), &db);

		info!("Initialize bucket_alias_table...");
		let bucket_alias_table = Table::new(
			BucketAliasTable,
			control_rep_param.clone(),
			system.clone(),
			&db,
		);
		info!("Initialize key_table_table...");
		let key_table = Table::new(KeyTable, control_rep_param, system.clone(), &db);

		// ---- S3 tables ----
		info!("Initialize block_ref_table...");
		let block_ref_table = Table::new(
			BlockRefTable {
				block_manager: block_manager.clone(),
			},
			meta_rep_param.clone(),
			system.clone(),
			&db,
		);

		info!("Initialize version_table...");
		let version_table = Table::new(
			VersionTable {
				block_ref_table: block_ref_table.clone(),
			},
			meta_rep_param.clone(),
			system.clone(),
			&db,
		);

		info!("Initialize multipart upload counter table...");
		let mpu_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db);

		info!("Initialize multipart upload table...");
		let mpu_table = Table::new(
			MultipartUploadTable {
				version_table: version_table.clone(),
				mpu_counter_table: mpu_counter_table.clone(),
			},
			meta_rep_param.clone(),
			system.clone(),
			&db,
		);

		info!("Initialize object counter table...");
		let object_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db);

		info!("Initialize object_table...");
		#[allow(clippy::redundant_clone)]
		let object_table = Table::new(
			ObjectTable {
				version_table: version_table.clone(),
				mpu_table: mpu_table.clone(),
				object_counter_table: object_counter_table.clone(),
			},
			meta_rep_param.clone(),
			system.clone(),
			&db,
		);

		info!("Load lifecycle worker state...");
		let lifecycle_persister =
			PersisterShared::new(&system.metadata_dir, "lifecycle_worker_state");
		lifecycle_worker::register_bg_vars(&lifecycle_persister, &mut bg_vars);

		// ---- K2V ----
		#[cfg(feature = "k2v")]
		let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param);

		// ---- setup block refcount recalculation ----
		// this function can be used to fix inconsistencies in the RC table
		block_manager.set_recalc_rc(vec![
			block_ref_recount_fn(&block_ref_table),
			// other functions could be added here if we had other tables
			// that hold references to data blocks
		]);

		// -- done --
		Ok(Arc::new(Self {
			config,
			bg_vars,
			replication_factor,
			db,
			system,
			block_manager,
			bucket_table,
			bucket_alias_table,
			key_table,
			bucket_lock: tokio::sync::Mutex::new(()),
			object_table,
			object_counter_table,
			mpu_table,
			mpu_counter_table,
			version_table,
			block_ref_table,
			lifecycle_persister,
			#[cfg(feature = "k2v")]
			k2v,
		}))
	}

	pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) -> Result<(), Error> {
		self.block_manager.spawn_workers(bg);

		self.bucket_table.spawn_workers(bg);
		self.bucket_alias_table.spawn_workers(bg);
		self.key_table.spawn_workers(bg);

		self.object_table.spawn_workers(bg);
		self.object_counter_table.spawn_workers(bg);
		self.mpu_table.spawn_workers(bg);
		self.mpu_counter_table.spawn_workers(bg);
		self.version_table.spawn_workers(bg);
		self.block_ref_table.spawn_workers(bg);

		bg.spawn_worker(lifecycle_worker::LifecycleWorker::new(
			self.clone(),
			self.lifecycle_persister.clone(),
		));

		#[cfg(feature = "k2v")]
		self.k2v.spawn_workers(bg);

		if let Some(itv) = self.config.metadata_auto_snapshot_interval.as_deref() {
			let interval = parse_duration::parse(itv)
				.ok_or_message("Invalid `metadata_auto_snapshot_interval`")?;
			if interval < std::time::Duration::from_secs(600) {
				return Err(Error::Message(
					"metadata_auto_snapshot_interval too small or negative".into(),
				));
			}

			bg.spawn_worker(crate::snapshot::AutoSnapshotWorker::new(
				self.clone(),
				interval,
			));
		}

		Ok(())
	}

	pub fn bucket_helper(&self) -> helper::bucket::BucketHelper {
		helper::bucket::BucketHelper(self)
	}

	pub fn key_helper(&self) -> helper::key::KeyHelper {
		helper::key::KeyHelper(self)
	}

	pub async fn locked_helper(&self) -> helper::locked::LockedHelper {
		let lock = self.bucket_lock.lock().await;
		helper::locked::LockedHelper(self, lock)
	}
}

#[cfg(feature = "k2v")]
impl GarageK2V {
	fn new(system: Arc<System>, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self {
		info!("Initialize K2V counter table...");
		let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db);

		info!("Initialize K2V subscription manager...");
		let subscriptions = Arc::new(SubscriptionManager::new());

		info!("Initialize K2V item table...");
		let item_table = Table::new(
			K2VItemTable {
				counter_table: counter_table.clone(),
				subscriptions: subscriptions.clone(),
			},
			meta_rep_param,
			system.clone(),
			db,
		);

		info!("Initialize K2V RPC handler...");
		let rpc = K2VRpcHandler::new(system, db, item_table.clone(), subscriptions);

		Self {
			item_table,
			counter_table,
			rpc,
		}
	}

	pub fn spawn_workers(&self, bg: &BackgroundRunner) {
		self.item_table.spawn_workers(bg);
		self.counter_table.spawn_workers(bg);
	}
}