1use crate::cfg::{Cfg, DEFAULT_PORT};
5use crate::core::file_cache::FILE_CACHE;
6use crate::core::sync::AsyncRwLock;
7use crate::replication::proto::replication_repo::Item;
8use crate::replication::proto::{
9 Label as ProtoLabel, ReplicationMode as ProtoReplicationMode,
10 ReplicationRepo as ProtoReplicationRepo, ReplicationSettings as ProtoReplicationSettings,
11};
12use crate::replication::replication_task::ReplicationTask;
13use crate::replication::{ManageReplications, TransactionNotification};
14use crate::storage::engine::StorageEngine;
15use crate::storage::query::condition::Parser;
16use crate::storage::query::filters::WhenFilter;
17use async_trait::async_trait;
18use bytes::Bytes;
19use log::{debug, error, warn};
20use prost::Message;
21use reduct_base::error::ReductError;
22use reduct_base::msg::replication_api::{
23 FullReplicationInfo, ReplicationInfo, ReplicationMode, ReplicationSettings,
24};
25use reduct_base::{not_found, unprocessable_entity};
26use std::collections::HashMap;
27use std::convert::TryFrom;
28use std::io::SeekFrom::Start;
29use std::io::{Read, Write};
30use std::path::PathBuf;
31use std::sync::Arc;
32use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
33use tokio::task::JoinHandle;
34use url::Url;
35
36const REPLICATION_REPO_FILE_NAME: &str = ".replications";
37
38impl From<ReplicationSettings> for ProtoReplicationSettings {
39 fn from(settings: ReplicationSettings) -> Self {
40 Self {
41 src_bucket: settings.src_bucket,
42 dst_bucket: settings.dst_bucket,
43 dst_host: settings.dst_host,
44 dst_token: settings.dst_token.unwrap_or_default(),
45 entries: settings.entries,
46 include: settings
47 .include
48 .into_iter()
49 .map(|(k, v)| ProtoLabel { name: k, value: v })
50 .collect(),
51 exclude: settings
52 .exclude
53 .into_iter()
54 .map(|(k, v)| ProtoLabel { name: k, value: v })
55 .collect(),
56 each_s: settings.each_s.unwrap_or(0.0),
57 each_n: settings.each_n.unwrap_or(0),
58 when: settings.when.map(|value| value.to_string()),
59 mode: ProtoReplicationMode::from(&settings.mode) as i32,
60 }
61 }
62}
63
64impl From<ProtoReplicationSettings> for ReplicationSettings {
65 fn from(settings: ProtoReplicationSettings) -> Self {
66 Self {
67 src_bucket: settings.src_bucket,
68 dst_bucket: settings.dst_bucket,
69 dst_host: settings.dst_host,
70 dst_token: if settings.dst_token.is_empty() {
71 None
72 } else {
73 Some(settings.dst_token)
74 },
75 entries: settings.entries,
76 include: settings
77 .include
78 .into_iter()
79 .map(|label| (label.name, label.value))
80 .collect(),
81 exclude: settings
82 .exclude
83 .into_iter()
84 .map(|label| (label.name, label.value))
85 .collect(),
86 each_s: if settings.each_s > 0.0 {
87 Some(settings.each_s)
88 } else {
89 None
90 },
91 each_n: if settings.each_n > 0 {
92 Some(settings.each_n)
93 } else {
94 None
95 },
96 when: if let Some(when) = settings.when {
97 match serde_json::from_str(&when) {
98 Ok(value) => Some(value),
99 Err(err) => {
100 error!(
101 "Failed to parse 'when' field: {} in replication settings: {}",
102 err, when
103 );
104 None
105 }
106 }
107 } else {
108 None
109 },
110 mode: ProtoReplicationMode::try_from(settings.mode)
111 .unwrap_or(ProtoReplicationMode::Enabled)
112 .into(),
113 }
114 }
115}
116
117impl From<&ReplicationMode> for ProtoReplicationMode {
118 fn from(mode: &ReplicationMode) -> Self {
119 match mode {
120 ReplicationMode::Enabled => ProtoReplicationMode::Enabled,
121 ReplicationMode::Paused => ProtoReplicationMode::Paused,
122 ReplicationMode::Disabled => ProtoReplicationMode::Disabled,
123 }
124 }
125}
126
127impl From<ProtoReplicationMode> for ReplicationMode {
128 fn from(mode: ProtoReplicationMode) -> Self {
129 match mode {
130 ProtoReplicationMode::Enabled => ReplicationMode::Enabled,
131 ProtoReplicationMode::Paused => ReplicationMode::Paused,
132 ProtoReplicationMode::Disabled => ReplicationMode::Disabled,
133 }
134 }
135}
136
137enum NotificationCommand {
140 Notify(TransactionNotification),
141 Stop,
142}
143
144pub(crate) struct ReplicationRepository {
145 replications: Arc<AsyncRwLock<HashMap<String, ReplicationTask>>>,
146 storage: Arc<StorageEngine>,
147 repo_path: PathBuf,
148 config: Cfg,
149 started: bool,
150 notification_tx: UnboundedSender<NotificationCommand>,
151 notification_worker: Option<JoinHandle<()>>,
152}
153
154#[async_trait]
155impl ManageReplications for ReplicationRepository {
156 async fn create_replication(
157 &mut self,
158 name: &str,
159 settings: ReplicationSettings,
160 ) -> Result<(), ReductError> {
161 if self.replications.read().await?.contains_key(name) {
163 return Err(ReductError::conflict(&format!(
164 "Replication '{}' already exists",
165 name
166 )));
167 }
168
169 self.create_or_update_replication_task(&name, settings)
170 .await
171 }
172
173 async fn update_replication(
174 &mut self,
175 name: &str,
176 settings: ReplicationSettings,
177 ) -> Result<(), ReductError> {
178 match self.replications.read().await?.get(name) {
180 Some(replication) => {
181 if replication.is_provisioned() {
182 Err(ReductError::conflict(&format!(
183 "Can't update provisioned replication '{}'",
184 name
185 )))
186 } else {
187 Ok(())
188 }
189 }
190 None => Err(ReductError::not_found(&format!(
191 "Replication '{}' does not exist",
192 name
193 ))),
194 }?;
195
196 self.create_or_update_replication_task(&name, settings)
197 .await
198 }
199
200 async fn replications(&self) -> Result<Vec<ReplicationInfo>, ReductError> {
201 let mut replications = Vec::new();
202 let guard = self.replications.read().await?;
203 for (_, replication) in guard.iter() {
204 replications.push(replication.info().await?);
205 }
206 Ok(replications)
207 }
208
209 async fn get_info(&self, name: &str) -> Result<FullReplicationInfo, ReductError> {
210 let guard = self.replications.read().await?;
211 let replication = guard.get(name).ok_or_else(|| {
212 ReductError::not_found(&format!("Replication '{}' does not exist", name))
213 })?;
214 let info = FullReplicationInfo {
215 info: replication.info().await?,
216 settings: replication.masked_settings().clone(),
217 diagnostics: replication.diagnostics().await?,
218 };
219 Ok(info)
220 }
221
222 async fn get_replication_settings(
223 &self,
224 name: &str,
225 ) -> Result<ReplicationSettings, ReductError> {
226 let guard = self.replications.read().await?;
227 guard
228 .get(name)
229 .map(|replication| replication.settings().clone())
230 .ok_or_else(|| {
231 ReductError::not_found(&format!("Replication '{}' does not exist", name))
232 })
233 }
234
235 async fn is_replication_running(&self, name: &str) -> Result<bool, ReductError> {
236 let guard = self.replications.read().await?;
237 guard
238 .get(name)
239 .map(|replication| replication.is_running())
240 .ok_or_else(|| {
241 ReductError::not_found(&format!("Replication '{}' does not exist", name))
242 })
243 }
244
245 async fn set_replication_provisioned(
246 &mut self,
247 name: &str,
248 provisioned: bool,
249 ) -> Result<(), ReductError> {
250 let mut guard = self.replications.write().await?;
251 let replication = guard.get_mut(name).ok_or_else(|| {
252 ReductError::not_found(&format!("Replication '{}' does not exist", name))
253 })?;
254 replication.set_provisioned(provisioned);
255 Ok(())
256 }
257
258 async fn remove_replication(&mut self, name: &str) -> Result<(), ReductError> {
259 let mut guard = self.replications.write().await?;
260 let repl = guard.get(name).ok_or_else(|| {
261 ReductError::not_found(&format!("Replication '{}' does not exist", name))
262 })?;
263 if repl.is_provisioned() {
264 return Err(ReductError::conflict(&format!(
265 "Can't remove provisioned replication '{}'",
266 name
267 )));
268 }
269 let removed = guard.remove(name);
270 drop(guard);
271 if let Some(mut repl) = removed {
272 repl.stop().await;
273 }
274 self.save_repo().await
275 }
276
277 async fn set_mode(&mut self, name: &str, mode: ReplicationMode) -> Result<(), ReductError> {
278 let mut guard = self.replications.write().await?;
279 let replication = guard.get_mut(name).ok_or_else(|| {
280 ReductError::not_found(&format!("Replication '{}' does not exist", name))
281 })?;
282 replication.set_mode(mode);
283 drop(guard);
284 self.save_repo().await
285 }
286
287 async fn notify(&mut self, notification: TransactionNotification) -> Result<(), ReductError> {
288 let should_enqueue = {
289 let guard = self.replications.read().await?;
290 guard
291 .iter()
292 .any(|(_, replication)| replication.settings().src_bucket == notification.bucket)
293 };
294 if should_enqueue {
295 self.notification_tx
296 .send(NotificationCommand::Notify(notification))
297 .map_err(|_| {
298 ReductError::internal_server_error("Failed to enqueue replication notification")
299 })?;
300 }
301 Ok(())
302 }
303
304 fn start(&mut self) {
305 self.start_all();
306 }
307
308 async fn stop(&mut self) {
309 let _ = self.notification_tx.send(NotificationCommand::Stop);
310 if let Some(worker) = self.notification_worker.take() {
311 if let Err(err) = worker.await {
312 error!("Failed to join replication notification worker: {:?}", err);
313 }
314 }
315
316 let mut guard = self.replications.write().await.unwrap();
317 for (_, task) in guard.iter_mut() {
318 task.stop().await;
319 }
320 }
321}
322
323impl ReplicationRepository {
324 pub(crate) async fn load_or_create(storage: Arc<StorageEngine>, config: Cfg) -> Self {
325 let repo_path = storage.data_path().join(REPLICATION_REPO_FILE_NAME);
326 let replications = Arc::new(AsyncRwLock::new(HashMap::<String, ReplicationTask>::new()));
327 let (notification_tx, mut notification_rx) = unbounded_channel::<NotificationCommand>();
328 let worker_replications = Arc::clone(&replications);
329 let notification_worker = tokio::spawn(async move {
330 while let Some(command) = notification_rx.recv().await {
331 match command {
332 NotificationCommand::Notify(notification) => {
333 let mut replications = match worker_replications.write().await {
334 Ok(guard) => guard,
335 Err(err) => {
336 error!("Failed to lock replication map: {}", err);
337 continue;
338 }
339 };
340
341 for (_, replication) in replications.iter_mut() {
342 if replication.settings().src_bucket != notification.bucket {
343 continue;
344 }
345
346 if let Err(err) = replication.notify(notification.clone()).await {
347 error!("Failed to notify replication task: {}", err);
348 }
349 }
350 }
351 NotificationCommand::Stop => break,
352 }
353 }
354 });
355
356 let mut repo = Self {
357 replications,
358 storage,
359 repo_path,
360 config,
361 started: false,
362 notification_tx,
363 notification_worker: Some(notification_worker),
364 };
365
366 let read_conf_file = async || {
367 let mut lock = FILE_CACHE
368 .write_or_create(&repo.repo_path, Start(0))
369 .await?;
370
371 let mut buf = Vec::new();
372 lock.read_to_end(&mut buf)?;
373 Ok::<Vec<u8>, ReductError>(buf)
374 };
375
376 match read_conf_file().await {
377 Ok(buf) => {
378 debug!(
379 "Reading replication repository from {}",
380 repo.repo_path.as_os_str().to_str().unwrap_or("...")
381 );
382 let proto_repo = ProtoReplicationRepo::decode(&mut Bytes::from(buf))
383 .expect("Error decoding replication repository");
384 for item in proto_repo.replications {
385 if let Err(err) = repo
386 .create_replication(&item.name, item.settings.unwrap().into())
387 .await
388 {
389 error!("Failed to load replication '{}': {}", item.name, err);
390 }
391 }
392 }
393 Err(err) => {
394 warn!(
395 "Failed to read replication repository from {}: {}",
396 repo.repo_path.as_os_str().to_str().unwrap_or("..."),
397 err
398 );
399 }
400 }
401 repo
402 }
403
404 async fn save_repo(&self) -> Result<(), ReductError> {
405 let replications = self.replications.read().await?;
406 let proto_repo = ProtoReplicationRepo {
407 replications: replications
408 .iter()
409 .map(|(name, replication)| Item {
410 name: name.clone(),
411 settings: Some(replication.settings().clone().into()),
412 })
413 .collect(),
414 };
415
416 let mut buf = Vec::new();
417 proto_repo
418 .encode(&mut buf)
419 .expect("Error encoding replication repository");
420
421 let mut file = FILE_CACHE
422 .write_or_create(&self.repo_path, Start(0))
423 .await?;
424 file.set_len(0)?;
425 file.write_all(&buf)?;
426 file.sync_all().await?;
427
428 Ok(())
429 }
430
431 async fn create_or_update_replication_task(
432 &mut self,
433 name: &str,
434 settings: ReplicationSettings,
435 ) -> Result<(), ReductError> {
436 let dest_url = match Url::parse(&settings.dst_host) {
438 Ok(url) => url,
439
440 Err(_) => {
441 return Err(unprocessable_entity!(
442 "Invalid destination host '{}'",
443 settings.dst_host
444 ))
445 }
446 };
447
448 if self.storage.get_bucket(&settings.src_bucket).await.is_err() {
450 return Err(not_found!(
451 "Source bucket '{}' for replication '{}' does not exist",
452 settings.src_bucket,
453 name
454 ));
455 }
456
457 if settings.src_bucket == settings.dst_bucket
459 && self.config.replication_conf.listening_port
460 == dest_url.port_or_known_default().unwrap_or(DEFAULT_PORT)
461 && ["127.0.0.1", "localhost", "0.0.0.0"].contains(&dest_url.host_str().unwrap_or(""))
462 {
463 return Err(unprocessable_entity!(
464 "Source and destination buckets must be different",
465 ));
466 }
467
468 let mut conf = self.config.clone();
470 if let Some(when) = &settings.when {
471 let (cond, directives) = match Parser::new().parse(when.clone()) {
472 Ok((cond, dirs)) => (cond, dirs),
473 Err(err) => {
474 return Err(unprocessable_entity!(
475 "Invalid replication condition: {}",
476 err.message
477 ))
478 }
479 };
480
481 let filer = WhenFilter::<TransactionNotification>::try_new(
482 cond,
483 directives,
484 self.config.io_conf.clone(),
485 true,
486 )?;
487 conf.io_conf = filer.io_config().clone();
488 }
489
490 let mut removed = self.replications.write().await?.remove(name);
492
493 let init_token = settings.dst_token.clone().or_else(|| {
495 removed
496 .as_ref()
497 .and_then(|r| r.settings().dst_token.clone())
498 });
499
500 if let Some(mut old) = removed.take() {
501 old.stop().await;
502 }
503
504 let mut settings = settings;
505 settings.dst_token = init_token;
506
507 let replication =
508 ReplicationTask::new(name.to_string(), settings, conf, Arc::clone(&self.storage));
509 let mut replication = replication;
510 if self.started {
511 replication.start();
512 }
513 self.replications
514 .write()
515 .await?
516 .insert(name.to_string(), replication);
517 self.save_repo().await
518 }
519}
520
521impl ReplicationRepository {
522 pub(crate) fn start_all(&mut self) {
523 if self.started {
524 return;
525 }
526
527 if let Some(mut replications) = self.replications.try_write() {
528 for (_, task) in replications.iter_mut() {
529 task.start();
530 }
531 } else {
532 let replications = Arc::clone(&self.replications);
533 tokio::spawn(async move {
534 let mut replications = match replications.write().await {
535 Ok(guard) => guard,
536 Err(err) => {
537 error!("Failed to lock replication map: {}", err);
538 return;
539 }
540 };
541 for (_, task) in replications.iter_mut() {
542 task.start();
543 }
544 });
545 }
546 self.started = true;
547 }
548}
549
550#[cfg(test)]
551mod tests {
552 use super::*;
553 use crate::core::sync::{reset_rwlock_config, set_rwlock_timeout};
554 use crate::replication::Transaction::WriteRecord;
555 use reduct_base::msg::bucket_api::BucketSettings;
556 use reduct_base::{conflict, internal_server_error, not_found, Labels};
557 use rstest::*;
558 use serial_test::serial;
559 use tokio::time::{sleep, Duration};
560
561 mod create {
562 use super::*;
563 #[rstest]
564 #[tokio::test]
565 async fn create_replication(
566 #[future] mut repo: ReplicationRepository,
567 settings: ReplicationSettings,
568 ) {
569 let mut repo = repo.await;
570 repo.create_replication("test", settings.clone())
571 .await
572 .unwrap();
573
574 let repls = repo.replications().await.unwrap();
575 assert_eq!(repls.len(), 1);
576 assert_eq!(repls[0].name, "test");
577 assert_eq!(
578 repo.get_replication_settings("test").await.unwrap(),
579 settings,
580 "Should create replication with the same name and settings"
581 );
582 }
583
584 #[rstest]
585 #[tokio::test]
586 async fn create_replication_with_same_name(
587 #[future] mut repo: ReplicationRepository,
588 settings: ReplicationSettings,
589 ) {
590 let mut repo = repo.await;
591 repo.create_replication("test", settings.clone())
592 .await
593 .unwrap();
594
595 assert_eq!(
596 repo.create_replication("test", settings).await,
597 Err(conflict!("Replication 'test' already exists")),
598 "Should not create replication with the same name"
599 );
600 }
601
602 #[rstest]
603 #[tokio::test]
604 async fn create_replication_with_invalid_url(
605 #[future] mut repo: ReplicationRepository,
606 settings: ReplicationSettings,
607 ) {
608 let mut repo = repo.await;
609 let mut settings = settings;
610 settings.dst_host = "invalid_url".to_string();
611
612 assert_eq!(
613 repo.create_replication("test", settings).await,
614 Err(unprocessable_entity!(
615 "Invalid destination host 'invalid_url'"
616 )),
617 "Should not create replication with invalid url"
618 );
619 }
620
621 #[rstest]
622 #[tokio::test]
623 async fn create_replication_to_same_bucket(
624 #[future] mut repo: ReplicationRepository,
625 settings: ReplicationSettings,
626 ) {
627 let mut repo = repo.await;
628 let mut settings = settings;
629 settings.dst_host = format!("http://localhost:{}", DEFAULT_PORT);
630 settings.dst_bucket = "bucket-1".to_string();
631
632 assert_eq!(
633 repo.create_replication("test", settings).await,
634 Err(unprocessable_entity!(
635 "Source and destination buckets must be different"
636 )),
637 "Should not create replication to the same bucket"
638 );
639 }
640
641 #[rstest]
642 #[tokio::test]
643 async fn test_replication_src_bucket_not_found(
644 #[future] mut repo: ReplicationRepository,
645 mut settings: ReplicationSettings,
646 ) {
647 let mut repo = repo.await;
648 settings.src_bucket = "bucket-2".to_string();
649 assert_eq!(
650 repo.create_replication("test", settings).await,
651 Err(not_found!(
652 "Source bucket 'bucket-2' for replication 'test' does not exist"
653 )),
654 "Should not create replication with non existing source bucket"
655 );
656 }
657
658 #[rstest]
659 #[tokio::test]
660 async fn test_replication_with_invalid_when_condition(
661 #[future] mut repo: ReplicationRepository,
662 settings: ReplicationSettings,
663 ) {
664 let mut repo = repo.await;
665 let mut settings = settings;
666 settings.when = Some(serde_json::json!({"$UNKNOWN_OP": ["&x", "y"]}));
667 assert_eq!(
668 repo.create_replication("test", settings).await,
669 Err(unprocessable_entity!(
670 "Invalid replication condition: Operator '$UNKNOWN_OP' not supported"
671 )),
672 "Should not create replication with invalid when condition"
673 );
674 }
675
676 #[rstest]
677 #[tokio::test]
678 async fn create_and_load_replications(
679 #[future] storage: Arc<StorageEngine>,
680 settings: ReplicationSettings,
681 ) {
682 let storage = storage.await;
683 let mut repo =
684 ReplicationRepository::load_or_create(Arc::clone(&storage), Cfg::default()).await;
685 repo.create_replication("test", settings.clone())
686 .await
687 .unwrap();
688
689 let repo =
690 ReplicationRepository::load_or_create(Arc::clone(&storage), Cfg::default()).await;
691 assert_eq!(repo.replications().await.unwrap().len(), 1);
692 assert_eq!(
693 repo.get_replication_settings("test").await.unwrap(),
694 settings,
695 "Should load replication from file"
696 );
697 }
698 }
699
700 mod update {
701 use super::*;
702 #[rstest]
703 #[tokio::test]
704 async fn test_update_replication(
705 #[future] mut repo: ReplicationRepository,
706 settings: ReplicationSettings,
707 ) {
708 let mut repo = repo.await;
709 repo.create_replication("test", settings.clone())
710 .await
711 .unwrap();
712
713 let mut settings = settings;
714 settings.dst_bucket = "bucket-3".to_string();
715 repo.update_replication("test", settings.clone())
716 .await
717 .unwrap();
718
719 let replication = repo.get_replication_settings("test").await.unwrap();
720 assert_eq!(replication.dst_bucket, "bucket-3");
721 }
722
723 #[rstest]
724 #[tokio::test]
725 async fn test_update_provisioned_replication(
726 #[future] mut repo: ReplicationRepository,
727 settings: ReplicationSettings,
728 ) {
729 let mut repo = repo.await;
730 repo.create_replication("test", settings.clone())
731 .await
732 .unwrap();
733
734 repo.set_replication_provisioned("test", true)
735 .await
736 .unwrap();
737
738 assert_eq!(
739 repo.update_replication("test", settings).await,
740 Err(conflict!("Can't update provisioned replication 'test'")),
741 "Should not update provisioned replication"
742 );
743 }
744
745 #[rstest]
746 #[tokio::test]
747 async fn test_update_non_existing_replication(#[future] mut repo: ReplicationRepository) {
748 let mut repo = repo.await;
749 assert_eq!(
750 repo.update_replication("test-2", ReplicationSettings::default())
751 .await,
752 Err(not_found!("Replication 'test-2' does not exist")),
753 "Should not update non existing replication"
754 );
755 }
756
757 #[rstest]
758 #[tokio::test]
759 async fn test_update_replication_with_invalid_url(
760 #[future] mut repo: ReplicationRepository,
761 settings: ReplicationSettings,
762 ) {
763 let mut repo = repo.await;
764 repo.create_replication("test", settings.clone())
765 .await
766 .unwrap();
767
768 let mut settings = settings;
769 settings.dst_host = "invalid_url".to_string();
770
771 assert_eq!(
772 repo.update_replication("test", settings).await,
773 Err(unprocessable_entity!(
774 "Invalid destination host 'invalid_url'"
775 )),
776 "Should not update replication with invalid url"
777 );
778 }
779
780 #[rstest]
781 #[tokio::test]
782 async fn test_update_replication_to_same_bucket(
783 #[future] mut repo: ReplicationRepository,
784 settings: ReplicationSettings,
785 ) {
786 let mut repo = repo.await;
787 repo.create_replication("test", settings.clone())
788 .await
789 .unwrap();
790
791 let mut settings = settings;
792 settings.dst_host = format!("http://localhost:{}", DEFAULT_PORT);
793 settings.dst_bucket = "bucket-1".to_string();
794
795 assert_eq!(
796 repo.update_replication("test", settings).await,
797 Err(unprocessable_entity!(
798 "Source and destination buckets must be different"
799 )),
800 "Should not update replication to the same bucket"
801 );
802 }
803
804 #[rstest]
805 #[tokio::test]
806 async fn test_update_replication_src_bucket_not_found(
807 #[future] mut repo: ReplicationRepository,
808 settings: ReplicationSettings,
809 ) {
810 let mut repo = repo.await;
811 repo.create_replication("test", settings.clone())
812 .await
813 .unwrap();
814
815 let mut settings = settings;
816 settings.src_bucket = "bucket-2".to_string();
817
818 assert_eq!(
819 repo.update_replication("test", settings).await,
820 Err(not_found!(
821 "Source bucket 'bucket-2' for replication 'test' does not exist"
822 )),
823 "Should not update replication with non existing source bucket"
824 );
825 }
826
827 #[rstest]
828 #[tokio::test]
829 async fn test_remove_old_replication_only_for_valid(
830 #[future] mut repo: ReplicationRepository,
831 mut settings: ReplicationSettings,
832 ) {
833 let mut repo = repo.await;
834 repo.create_replication("test", settings.clone())
835 .await
836 .unwrap();
837 settings.when = Some(serde_json::json!({"$not-exist": [true, true]}));
838
839 let err = repo
840 .update_replication("test", settings)
841 .await
842 .err()
843 .unwrap();
844 assert_eq!(
845 err,
846 unprocessable_entity!(
847 "Invalid replication condition: Operator '$not-exist' not supported"
848 ),
849 "Should not update replication with invalid when condition"
850 );
851
852 assert!(repo.get_info("test").await.is_ok(), "Was not removed");
853 }
854
855 #[rstest]
856 #[tokio::test]
857 async fn test_update_replication_keep_dst_token_if_not_set(
858 #[future] mut repo: ReplicationRepository,
859 settings: ReplicationSettings,
860 ) {
861 let mut repo = repo.await;
862 repo.create_replication("test", settings.clone())
863 .await
864 .unwrap();
865
866 let mut updated = settings;
867 updated.dst_bucket = "bucket-3".to_string();
868 updated.dst_token = None;
869 repo.update_replication("test", updated).await.unwrap();
870
871 let replication = repo.get_replication_settings("test").await.unwrap();
872 assert_eq!(replication.dst_bucket, "bucket-3");
873 assert_eq!(replication.dst_token, Some("token".to_string()));
874 }
875 }
876
877 mod remove {
878 use super::*;
879 #[rstest]
880 #[tokio::test]
881 async fn test_remove_replication(
882 #[future] mut repo: ReplicationRepository,
883 #[future] storage: Arc<StorageEngine>,
884 settings: ReplicationSettings,
885 ) {
886 let mut repo = repo.await;
887 let storage = storage.await;
888 repo.create_replication("test", settings.clone())
889 .await
890 .unwrap();
891
892 repo.remove_replication("test").await.unwrap();
893 assert_eq!(repo.replications().await.unwrap().len(), 0);
894
895 let repo =
897 ReplicationRepository::load_or_create(Arc::clone(&storage), Cfg::default()).await;
898 assert_eq!(
899 repo.replications().await.unwrap().len(),
900 0,
901 "Should remove replication permanently"
902 );
903 }
904
905 #[rstest]
906 #[tokio::test]
907 async fn test_remove_non_existing_replication(#[future] mut repo: ReplicationRepository) {
908 let mut repo = repo.await;
909 assert_eq!(
910 repo.remove_replication("test-2").await,
911 Err(not_found!("Replication 'test-2' does not exist")),
912 "Should not remove non existing replication"
913 );
914 }
915
916 #[rstest]
917 #[tokio::test]
918 async fn test_remove_provisioned_replication(
919 #[future] mut repo: ReplicationRepository,
920 settings: ReplicationSettings,
921 ) {
922 let mut repo = repo.await;
923 repo.create_replication("test", settings.clone())
924 .await
925 .unwrap();
926
927 repo.set_replication_provisioned("test", true)
928 .await
929 .unwrap();
930
931 assert_eq!(
932 repo.remove_replication("test").await,
933 Err(conflict!("Can't remove provisioned replication 'test'")),
934 "Should not remove provisioned replication"
935 );
936 }
937 }
938
939 mod get {
940 use super::*;
941 use reduct_base::io::RecordMeta;
942
943 #[rstest]
944 #[tokio::test]
945 async fn test_get_replication(
946 #[future] mut repo: ReplicationRepository,
947 settings: ReplicationSettings,
948 ) {
949 let mut repo = repo.await;
950 repo.create_replication("test", settings.clone())
951 .await
952 .unwrap();
953 {
954 repo.notify(TransactionNotification {
955 bucket: "bucket-1".to_string(),
956 entry: "entry-1".to_string(),
957 meta: RecordMeta::builder().build(),
958 event: WriteRecord(0),
959 })
960 .await
961 .unwrap();
962 sleep(Duration::from_millis(100)).await;
963 }
964
965 let info = repo.get_info("test").await.unwrap();
966 assert_eq!(info.settings.src_bucket, settings.src_bucket);
967 assert_eq!(info.info.name, "test");
968 }
969
970 #[rstest]
971 #[tokio::test]
972 async fn test_get_non_existing_replication(#[future] repo: ReplicationRepository) {
973 let repo = repo.await;
974 assert_eq!(
975 repo.get_info("test-2").await.err(),
976 Some(not_found!("Replication 'test-2' does not exist")),
977 "Should not get non existing replication"
978 );
979 assert_eq!(
980 repo.get_replication_settings("test-2").await.err(),
981 Some(not_found!("Replication 'test-2' does not exist")),
982 "Should not get settings for non existing replication"
983 );
984 assert_eq!(
985 repo.is_replication_running("test-2").await.err(),
986 Some(not_found!("Replication 'test-2' does not exist")),
987 "Should not get running state for non existing replication"
988 );
989 }
990
991 #[rstest]
992 #[tokio::test]
993 async fn test_get_mut_non_existing_replication(#[future] mut repo: ReplicationRepository) {
994 let mut repo = repo.await;
995 assert_eq!(
996 repo.set_replication_provisioned("test-2", true).await.err(),
997 Some(not_found!("Replication 'test-2' does not exist")),
998 "Should not get non existing replication"
999 );
1000 }
1001 }
1002
1003 mod notify {
1004 use super::*;
1005 use reduct_base::io::RecordMeta;
1006 use tokio::time::{sleep, Duration};
1007
1008 #[rstest]
1009 #[tokio::test]
1010 async fn test_notify_replication(
1011 #[future] mut repo: ReplicationRepository,
1012 settings: ReplicationSettings,
1013 ) {
1014 let mut repo = repo.await;
1015 repo.create_replication("test", settings.clone())
1016 .await
1017 .unwrap();
1018
1019 let notification = TransactionNotification {
1020 bucket: "bucket-1".to_string(),
1021 entry: "entry-1".to_string(),
1022 meta: RecordMeta::builder().build(),
1023 event: WriteRecord(0),
1024 };
1025
1026 repo.notify(notification.clone()).await.unwrap();
1027 sleep(Duration::from_millis(50)).await;
1028 assert_eq!(repo.get_info("test").await.unwrap().info.pending_records, 1);
1029 }
1030
1031 #[rstest]
1032 #[tokio::test]
1033 async fn test_notify_wrong_bucket(
1034 #[future] mut repo: ReplicationRepository,
1035 settings: ReplicationSettings,
1036 ) {
1037 let mut repo = repo.await;
1038 repo.create_replication("test", settings.clone())
1039 .await
1040 .unwrap();
1041
1042 let notification = TransactionNotification {
1043 bucket: "bucket-2".to_string(),
1044 entry: "entry-1".to_string(),
1045 meta: RecordMeta::builder().build(),
1046 event: WriteRecord(0),
1047 };
1048
1049 repo.notify(notification).await.unwrap();
1050 sleep(Duration::from_millis(50)).await;
1051 assert_eq!(
1052 repo.get_info("test").await.unwrap().info.pending_records,
1053 0,
1054 "Should not notify replication for wrong bucket"
1055 );
1056 }
1057
1058 #[rstest]
1059 #[tokio::test]
1060 async fn test_notify_after_stop_returns_error(
1061 #[future] mut repo: ReplicationRepository,
1062 settings: ReplicationSettings,
1063 ) {
1064 let mut repo = repo.await;
1065 repo.create_replication("test", settings).await.unwrap();
1066 repo.stop().await;
1067
1068 let notification = TransactionNotification {
1069 bucket: "bucket-1".to_string(),
1070 entry: "entry-1".to_string(),
1071 meta: RecordMeta::builder().build(),
1072 event: WriteRecord(0),
1073 };
1074
1075 assert_eq!(
1076 repo.notify(notification).await.err(),
1077 Some(internal_server_error!(
1078 "Failed to enqueue replication notification"
1079 ))
1080 );
1081 }
1082
1083 #[rstest]
1084 #[tokio::test]
1085 async fn test_notify_skips_non_matching_replications(
1086 #[future] mut repo: ReplicationRepository,
1087 settings: ReplicationSettings,
1088 ) {
1089 let mut repo = repo.await;
1090 let mut settings_1 = settings.clone();
1091 settings_1.src_bucket = "bucket-1".to_string();
1092 repo.create_replication("test-1", settings_1).await.unwrap();
1093
1094 repo.storage
1095 .create_bucket("bucket-2", BucketSettings::default())
1096 .await
1097 .unwrap();
1098 let mut settings_2 = settings;
1099 settings_2.src_bucket = "bucket-2".to_string();
1100 settings_2.dst_bucket = "bucket-1".to_string();
1101 repo.create_replication("test-2", settings_2).await.unwrap();
1102
1103 let notification = TransactionNotification {
1104 bucket: "bucket-1".to_string(),
1105 entry: "entry-1".to_string(),
1106 meta: RecordMeta::builder().build(),
1107 event: WriteRecord(0),
1108 };
1109
1110 repo.notify(notification).await.unwrap();
1111 sleep(Duration::from_millis(50)).await;
1112 assert_eq!(
1113 repo.get_info("test-1").await.unwrap().info.pending_records,
1114 1
1115 );
1116 assert_eq!(
1117 repo.get_info("test-2").await.unwrap().info.pending_records,
1118 0
1119 );
1120 }
1121
1122 #[rstest]
1123 #[tokio::test]
1124 #[serial]
1125 async fn test_notify_worker_lock_timeout(
1126 #[future] mut repo: ReplicationRepository,
1127 settings: ReplicationSettings,
1128 ) {
1129 struct ResetGuard;
1130 impl Drop for ResetGuard {
1131 fn drop(&mut self) {
1132 reset_rwlock_config();
1133 }
1134 }
1135 let _reset = ResetGuard;
1136
1137 let mut repo = repo.await;
1138 repo.create_replication("test", settings).await.unwrap();
1139
1140 set_rwlock_timeout(Duration::from_millis(20));
1141 let replications = Arc::clone(&repo.replications);
1142 let guard = replications.read().await.unwrap();
1143
1144 let notification = TransactionNotification {
1145 bucket: "bucket-1".to_string(),
1146 entry: "entry-1".to_string(),
1147 meta: RecordMeta::builder().build(),
1148 event: WriteRecord(0),
1149 };
1150 repo.notify(notification).await.unwrap();
1151 sleep(Duration::from_millis(50)).await;
1152 drop(guard);
1153
1154 assert_eq!(repo.get_info("test").await.unwrap().info.pending_records, 0);
1155 }
1156 }
1157
1158 mod start {
1159 use super::*;
1160
1161 #[rstest]
1162 #[tokio::test]
1163 async fn test_start_all(
1164 #[future] mut repo: ReplicationRepository,
1165 settings: ReplicationSettings,
1166 ) {
1167 let mut repo = repo.await;
1168 repo.create_replication("test-1", settings.clone())
1169 .await
1170 .unwrap();
1171 repo.create_replication("test-2", settings.clone())
1172 .await
1173 .unwrap();
1174
1175 repo.start();
1176 assert!(
1177 repo.is_replication_running("test-1").await.unwrap(),
1178 "Replication 'test-1' should be running"
1179 );
1180 assert!(
1181 repo.is_replication_running("test-2").await.unwrap(),
1182 "Replication 'test-2' should be running"
1183 );
1184 }
1185
1186 #[rstest]
1187 #[tokio::test]
1188 async fn test_double_start(
1189 #[future] mut repo: ReplicationRepository,
1190 settings: ReplicationSettings,
1191 ) {
1192 let mut repo = repo.await;
1193 repo.create_replication("test-1", settings.clone())
1194 .await
1195 .unwrap();
1196
1197 repo.start();
1198 repo.start(); assert!(
1201 repo.is_replication_running("test-1").await.unwrap(),
1202 "Replication 'test-1' should be running"
1203 );
1204 }
1205
1206 #[rstest]
1207 #[tokio::test]
1208 async fn test_start_all_when_lock_contended(
1209 #[future] mut repo: ReplicationRepository,
1210 settings: ReplicationSettings,
1211 ) {
1212 let mut repo = repo.await;
1213 repo.create_replication("test-1", settings.clone())
1214 .await
1215 .unwrap();
1216 repo.create_replication("test-2", settings).await.unwrap();
1217
1218 let replications = Arc::clone(&repo.replications);
1219 let guard = replications.write().await.unwrap();
1220 repo.start();
1221 drop(guard);
1222 sleep(Duration::from_millis(50)).await;
1223
1224 assert!(repo.is_replication_running("test-1").await.unwrap());
1225 assert!(repo.is_replication_running("test-2").await.unwrap());
1226 }
1227
1228 #[rstest]
1229 #[tokio::test]
1230 #[serial]
1231 async fn test_start_all_lock_timeout(
1232 #[future] mut repo: ReplicationRepository,
1233 settings: ReplicationSettings,
1234 ) {
1235 struct ResetGuard;
1236 impl Drop for ResetGuard {
1237 fn drop(&mut self) {
1238 reset_rwlock_config();
1239 }
1240 }
1241 let _reset = ResetGuard;
1242
1243 let mut repo = repo.await;
1244 repo.create_replication("test-1", settings).await.unwrap();
1245
1246 set_rwlock_timeout(Duration::from_millis(20));
1247 let replications = Arc::clone(&repo.replications);
1248 let guard = replications.read().await.unwrap();
1249 repo.start();
1250 sleep(Duration::from_millis(50)).await;
1251 drop(guard);
1252
1253 assert!(!repo.is_replication_running("test-1").await.unwrap());
1254 }
1255 }
1256
1257 mod set_mode {
1258 use super::*;
1259
1260 #[rstest]
1261 #[tokio::test]
1262 async fn test_set_mode(
1263 #[future] mut repo: ReplicationRepository,
1264 settings: ReplicationSettings,
1265 ) {
1266 let mut repo = repo.await;
1267 repo.create_replication("test-1", settings.clone())
1268 .await
1269 .unwrap();
1270 repo.set_mode("test-1", ReplicationMode::Paused)
1271 .await
1272 .unwrap();
1273
1274 assert_eq!(
1275 repo.get_info("test-1").await.unwrap().info.mode,
1276 ReplicationMode::Paused
1277 );
1278 }
1279
1280 #[rstest]
1281 #[tokio::test]
1282 async fn test_set_mode_non_existing(#[future] mut repo: ReplicationRepository) {
1283 let mut repo = repo.await;
1284 assert_eq!(
1285 repo.set_mode("test-1", ReplicationMode::Paused).await,
1286 Err(not_found!("Replication 'test-1' does not exist"))
1287 );
1288 }
1289 }
1290
1291 #[fixture]
1292 fn settings() -> ReplicationSettings {
1293 ReplicationSettings {
1294 src_bucket: "bucket-1".to_string(),
1295 dst_bucket: "bucket-2".to_string(),
1296 dst_host: "http://localhost".to_string(),
1297 dst_token: Some("token".to_string()),
1298 entries: vec!["entry-1".to_string()],
1299 include: Labels::default(),
1300 exclude: Labels::default(),
1301 each_n: None,
1302 each_s: None,
1303 when: None,
1304 mode: ReplicationMode::Enabled,
1305 }
1306 }
1307
1308 #[fixture]
1309 async fn storage() -> Arc<StorageEngine> {
1310 let tmp_dir = tempfile::tempdir().unwrap();
1311 let cfg = Cfg {
1312 data_path: tmp_dir.keep(),
1313 ..Cfg::default()
1314 };
1315 let storage = StorageEngine::builder()
1316 .with_data_path(cfg.data_path.clone())
1317 .with_cfg(cfg)
1318 .build()
1319 .await;
1320
1321 let bucket = storage
1322 .create_bucket("bucket-1", BucketSettings::default())
1323 .await
1324 .unwrap()
1325 .upgrade_and_unwrap();
1326 let _ = bucket.get_or_create_entry("entry-1").await.unwrap();
1327 Arc::new(storage)
1328 }
1329
1330 #[fixture]
1331 async fn repo(#[future] storage: Arc<StorageEngine>) -> ReplicationRepository {
1332 let storage = storage.await;
1333 ReplicationRepository::load_or_create(storage, Cfg::default()).await
1334 }
1335}