1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// Copyright 2021-2026 ReductSoftware UG
// Licensed under the Apache License, Version 2.0
use async_trait::async_trait;
use reduct_base::error::ReductError;
use reduct_base::io::RecordMeta;
use reduct_base::msg::replication_api::{
FullReplicationInfo, ReplicationInfo, ReplicationMode, ReplicationSettings,
};
mod diagnostics;
pub mod proto;
mod remote_bucket;
mod replication_repository;
mod replication_sender;
mod replication_task;
mod transaction_filter;
mod transaction_log;
/// Replication event to be synchronized.
#[derive(Debug, Clone, PartialEq)]
pub enum Transaction {
/// Write a record to a bucket (timestamp)
WriteRecord(u64),
/// Update a record in a bucket (timestamp)
UpdateRecord(u64),
}
impl Into<u8> for Transaction {
fn into(self) -> u8 {
match self {
Transaction::WriteRecord(_) => 0,
Transaction::UpdateRecord(_) => 1,
}
}
}
impl Transaction {
pub fn timestamp(&self) -> &u64 {
match self {
Transaction::WriteRecord(ts) => ts,
Transaction::UpdateRecord(ts) => ts,
}
}
pub fn into_timestamp(self) -> u64 {
match self {
Transaction::WriteRecord(ts) => ts,
Transaction::UpdateRecord(ts) => ts,
}
}
}
impl TryFrom<u8> for Transaction {
type Error = ReductError;
fn try_from(value: u8) -> Result<Self, Self::Error> {
match value {
0 => Ok(Transaction::WriteRecord(0)),
1 => Ok(Transaction::UpdateRecord(0)),
_ => Err(ReductError::internal_server_error(
"Invalid transaction type",
)),
}
}
}
#[derive(Debug, Clone)]
pub struct TransactionNotification {
pub bucket: String,
pub entry: String,
pub meta: RecordMeta,
pub event: Transaction,
}
#[async_trait]
pub trait ManageReplications {
/// Create a new replication.
///
/// # Arguments
/// * `name` - Replication name.
/// * `settings` - Replication settings.
///
/// # Errors
///
/// * `ReductError::Conflict` - Replication already exists.
/// * `ReductError::BadRequest` - Invalid destination host.
/// * `ReductError::NotFound` - Source bucket does not exist.
async fn create_replication(
&mut self,
name: &str,
settings: ReplicationSettings,
) -> Result<(), ReductError>;
/// Update an existing replication.
///
/// # Arguments
///
/// * `name` - Replication name.
/// * `settings` - Replication settings.
///
/// # Errors
///
/// A `ReductError` is returned if the update fails.
async fn update_replication(
&mut self,
name: &str,
settings: ReplicationSettings,
) -> Result<(), ReductError>;
/// List all replications.
async fn replications(&self) -> Result<Vec<ReplicationInfo>, ReductError>;
/// Get replication information.
async fn get_info(&self, name: &str) -> Result<FullReplicationInfo, ReductError>;
/// Get replication settings.
async fn get_replication_settings(
&self,
name: &str,
) -> Result<ReplicationSettings, ReductError>;
/// Check if replication worker is running.
async fn is_replication_running(&self, name: &str) -> Result<bool, ReductError>;
/// Mark replication as provisioned/unprovisioned.
async fn set_replication_provisioned(
&mut self,
name: &str,
provisioned: bool,
) -> Result<(), ReductError>;
/// Remove a replication task
async fn remove_replication(&mut self, name: &str) -> Result<(), ReductError>;
/// Update replication mode
async fn set_mode(&mut self, name: &str, mode: ReplicationMode) -> Result<(), ReductError>;
/// Notify replication task about a new transaction.
///
/// # Arguments
///
/// * `notification` - Transaction notification.
///
/// # Errors
///
/// A `ReductError` is returned if the notification fails.
async fn notify(&mut self, notification: TransactionNotification) -> Result<(), ReductError>;
/// Start background workers if they are not running yet.
fn start(&mut self);
/// Stop background workers and wait until they finish.
async fn stop(&mut self);
}
pub(crate) use replication_repository::ReplicationRepoBuilder;