Skip to main content

reductstore/replication/replication_repository/
repo.rs

1// Copyright 2023-2026 ReductSoftware UG
2// Licensed under the Business Source License 1.1
3
4use crate::cfg::{Cfg, DEFAULT_PORT};
5use crate::core::file_cache::FILE_CACHE;
6use crate::core::sync::AsyncRwLock;
7use crate::replication::proto::replication_repo::Item;
8use crate::replication::proto::{
9    Label as ProtoLabel, ReplicationMode as ProtoReplicationMode,
10    ReplicationRepo as ProtoReplicationRepo, ReplicationSettings as ProtoReplicationSettings,
11};
12use crate::replication::replication_task::ReplicationTask;
13use crate::replication::{ManageReplications, TransactionNotification};
14use crate::storage::engine::StorageEngine;
15use crate::storage::query::condition::Parser;
16use crate::storage::query::filters::WhenFilter;
17use async_trait::async_trait;
18use bytes::Bytes;
19use log::{debug, error, warn};
20use prost::Message;
21use reduct_base::error::ReductError;
22use reduct_base::msg::replication_api::{
23    FullReplicationInfo, ReplicationInfo, ReplicationMode, ReplicationSettings,
24};
25use reduct_base::{not_found, unprocessable_entity};
26use std::collections::HashMap;
27use std::convert::TryFrom;
28use std::io::SeekFrom::Start;
29use std::io::{Read, Write};
30use std::path::PathBuf;
31use std::sync::Arc;
32use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
33use tokio::task::JoinHandle;
34use url::Url;
35
36const REPLICATION_REPO_FILE_NAME: &str = ".replications";
37
38impl From<ReplicationSettings> for ProtoReplicationSettings {
39    fn from(settings: ReplicationSettings) -> Self {
40        Self {
41            src_bucket: settings.src_bucket,
42            dst_bucket: settings.dst_bucket,
43            dst_host: settings.dst_host,
44            dst_token: settings.dst_token.unwrap_or_default(),
45            entries: settings.entries,
46            include: settings
47                .include
48                .into_iter()
49                .map(|(k, v)| ProtoLabel { name: k, value: v })
50                .collect(),
51            exclude: settings
52                .exclude
53                .into_iter()
54                .map(|(k, v)| ProtoLabel { name: k, value: v })
55                .collect(),
56            each_s: settings.each_s.unwrap_or(0.0),
57            each_n: settings.each_n.unwrap_or(0),
58            when: settings.when.map(|value| value.to_string()),
59            mode: ProtoReplicationMode::from(&settings.mode) as i32,
60        }
61    }
62}
63
64impl From<ProtoReplicationSettings> for ReplicationSettings {
65    fn from(settings: ProtoReplicationSettings) -> Self {
66        Self {
67            src_bucket: settings.src_bucket,
68            dst_bucket: settings.dst_bucket,
69            dst_host: settings.dst_host,
70            dst_token: if settings.dst_token.is_empty() {
71                None
72            } else {
73                Some(settings.dst_token)
74            },
75            entries: settings.entries,
76            include: settings
77                .include
78                .into_iter()
79                .map(|label| (label.name, label.value))
80                .collect(),
81            exclude: settings
82                .exclude
83                .into_iter()
84                .map(|label| (label.name, label.value))
85                .collect(),
86            each_s: if settings.each_s > 0.0 {
87                Some(settings.each_s)
88            } else {
89                None
90            },
91            each_n: if settings.each_n > 0 {
92                Some(settings.each_n)
93            } else {
94                None
95            },
96            when: if let Some(when) = settings.when {
97                match serde_json::from_str(&when) {
98                    Ok(value) => Some(value),
99                    Err(err) => {
100                        error!(
101                            "Failed to parse 'when' field: {} in replication settings: {}",
102                            err, when
103                        );
104                        None
105                    }
106                }
107            } else {
108                None
109            },
110            mode: ProtoReplicationMode::try_from(settings.mode)
111                .unwrap_or(ProtoReplicationMode::Enabled)
112                .into(),
113        }
114    }
115}
116
117impl From<&ReplicationMode> for ProtoReplicationMode {
118    fn from(mode: &ReplicationMode) -> Self {
119        match mode {
120            ReplicationMode::Enabled => ProtoReplicationMode::Enabled,
121            ReplicationMode::Paused => ProtoReplicationMode::Paused,
122            ReplicationMode::Disabled => ProtoReplicationMode::Disabled,
123        }
124    }
125}
126
127impl From<ProtoReplicationMode> for ReplicationMode {
128    fn from(mode: ProtoReplicationMode) -> Self {
129        match mode {
130            ProtoReplicationMode::Enabled => ReplicationMode::Enabled,
131            ProtoReplicationMode::Paused => ReplicationMode::Paused,
132            ProtoReplicationMode::Disabled => ReplicationMode::Disabled,
133        }
134    }
135}
136
137/// A repository for managing replications from HTTP API
138
139enum NotificationCommand {
140    Notify(TransactionNotification),
141    Stop,
142}
143
144pub(crate) struct ReplicationRepository {
145    replications: Arc<AsyncRwLock<HashMap<String, ReplicationTask>>>,
146    storage: Arc<StorageEngine>,
147    repo_path: PathBuf,
148    config: Cfg,
149    started: bool,
150    notification_tx: UnboundedSender<NotificationCommand>,
151    notification_worker: Option<JoinHandle<()>>,
152}
153
154#[async_trait]
155impl ManageReplications for ReplicationRepository {
156    async fn create_replication(
157        &mut self,
158        name: &str,
159        settings: ReplicationSettings,
160    ) -> Result<(), ReductError> {
161        // check if replication already exists
162        if self.replications.read().await?.contains_key(name) {
163            return Err(ReductError::conflict(&format!(
164                "Replication '{}' already exists",
165                name
166            )));
167        }
168
169        self.create_or_update_replication_task(&name, settings)
170            .await
171    }
172
173    async fn update_replication(
174        &mut self,
175        name: &str,
176        settings: ReplicationSettings,
177    ) -> Result<(), ReductError> {
178        // check if replication exists and not provisioned
179        match self.replications.read().await?.get(name) {
180            Some(replication) => {
181                if replication.is_provisioned() {
182                    Err(ReductError::conflict(&format!(
183                        "Can't update provisioned replication '{}'",
184                        name
185                    )))
186                } else {
187                    Ok(())
188                }
189            }
190            None => Err(ReductError::not_found(&format!(
191                "Replication '{}' does not exist",
192                name
193            ))),
194        }?;
195
196        self.create_or_update_replication_task(&name, settings)
197            .await
198    }
199
200    async fn replications(&self) -> Result<Vec<ReplicationInfo>, ReductError> {
201        let mut replications = Vec::new();
202        let guard = self.replications.read().await?;
203        for (_, replication) in guard.iter() {
204            replications.push(replication.info().await?);
205        }
206        Ok(replications)
207    }
208
209    async fn get_info(&self, name: &str) -> Result<FullReplicationInfo, ReductError> {
210        let guard = self.replications.read().await?;
211        let replication = guard.get(name).ok_or_else(|| {
212            ReductError::not_found(&format!("Replication '{}' does not exist", name))
213        })?;
214        let info = FullReplicationInfo {
215            info: replication.info().await?,
216            settings: replication.masked_settings().clone(),
217            diagnostics: replication.diagnostics().await?,
218        };
219        Ok(info)
220    }
221
222    async fn get_replication_settings(
223        &self,
224        name: &str,
225    ) -> Result<ReplicationSettings, ReductError> {
226        let guard = self.replications.read().await?;
227        guard
228            .get(name)
229            .map(|replication| replication.settings().clone())
230            .ok_or_else(|| {
231                ReductError::not_found(&format!("Replication '{}' does not exist", name))
232            })
233    }
234
235    async fn is_replication_running(&self, name: &str) -> Result<bool, ReductError> {
236        let guard = self.replications.read().await?;
237        guard
238            .get(name)
239            .map(|replication| replication.is_running())
240            .ok_or_else(|| {
241                ReductError::not_found(&format!("Replication '{}' does not exist", name))
242            })
243    }
244
245    async fn set_replication_provisioned(
246        &mut self,
247        name: &str,
248        provisioned: bool,
249    ) -> Result<(), ReductError> {
250        let mut guard = self.replications.write().await?;
251        let replication = guard.get_mut(name).ok_or_else(|| {
252            ReductError::not_found(&format!("Replication '{}' does not exist", name))
253        })?;
254        replication.set_provisioned(provisioned);
255        Ok(())
256    }
257
258    async fn remove_replication(&mut self, name: &str) -> Result<(), ReductError> {
259        let mut guard = self.replications.write().await?;
260        let repl = guard.get(name).ok_or_else(|| {
261            ReductError::not_found(&format!("Replication '{}' does not exist", name))
262        })?;
263        if repl.is_provisioned() {
264            return Err(ReductError::conflict(&format!(
265                "Can't remove provisioned replication '{}'",
266                name
267            )));
268        }
269        let removed = guard.remove(name);
270        drop(guard);
271        if let Some(mut repl) = removed {
272            repl.stop().await;
273        }
274        self.save_repo().await
275    }
276
277    async fn set_mode(&mut self, name: &str, mode: ReplicationMode) -> Result<(), ReductError> {
278        let mut guard = self.replications.write().await?;
279        let replication = guard.get_mut(name).ok_or_else(|| {
280            ReductError::not_found(&format!("Replication '{}' does not exist", name))
281        })?;
282        replication.set_mode(mode);
283        drop(guard);
284        self.save_repo().await
285    }
286
287    async fn notify(&mut self, notification: TransactionNotification) -> Result<(), ReductError> {
288        let should_enqueue = {
289            let guard = self.replications.read().await?;
290            guard
291                .iter()
292                .any(|(_, replication)| replication.settings().src_bucket == notification.bucket)
293        };
294        if should_enqueue {
295            self.notification_tx
296                .send(NotificationCommand::Notify(notification))
297                .map_err(|_| {
298                    ReductError::internal_server_error("Failed to enqueue replication notification")
299                })?;
300        }
301        Ok(())
302    }
303
304    fn start(&mut self) {
305        self.start_all();
306    }
307
308    async fn stop(&mut self) {
309        let _ = self.notification_tx.send(NotificationCommand::Stop);
310        if let Some(worker) = self.notification_worker.take() {
311            if let Err(err) = worker.await {
312                error!("Failed to join replication notification worker: {:?}", err);
313            }
314        }
315
316        let mut guard = self.replications.write().await.unwrap();
317        for (_, task) in guard.iter_mut() {
318            task.stop().await;
319        }
320    }
321}
322
323impl ReplicationRepository {
324    pub(crate) async fn load_or_create(storage: Arc<StorageEngine>, config: Cfg) -> Self {
325        let repo_path = storage.data_path().join(REPLICATION_REPO_FILE_NAME);
326        let replications = Arc::new(AsyncRwLock::new(HashMap::<String, ReplicationTask>::new()));
327        let (notification_tx, mut notification_rx) = unbounded_channel::<NotificationCommand>();
328        let worker_replications = Arc::clone(&replications);
329        let notification_worker = tokio::spawn(async move {
330            while let Some(command) = notification_rx.recv().await {
331                match command {
332                    NotificationCommand::Notify(notification) => {
333                        let mut replications = match worker_replications.write().await {
334                            Ok(guard) => guard,
335                            Err(err) => {
336                                error!("Failed to lock replication map: {}", err);
337                                continue;
338                            }
339                        };
340
341                        for (_, replication) in replications.iter_mut() {
342                            if replication.settings().src_bucket != notification.bucket {
343                                continue;
344                            }
345
346                            if let Err(err) = replication.notify(notification.clone()).await {
347                                error!("Failed to notify replication task: {}", err);
348                            }
349                        }
350                    }
351                    NotificationCommand::Stop => break,
352                }
353            }
354        });
355
356        let mut repo = Self {
357            replications,
358            storage,
359            repo_path,
360            config,
361            started: false,
362            notification_tx,
363            notification_worker: Some(notification_worker),
364        };
365
366        let read_conf_file = async || {
367            let mut lock = FILE_CACHE
368                .write_or_create(&repo.repo_path, Start(0))
369                .await?;
370
371            let mut buf = Vec::new();
372            lock.read_to_end(&mut buf)?;
373            Ok::<Vec<u8>, ReductError>(buf)
374        };
375
376        match read_conf_file().await {
377            Ok(buf) => {
378                debug!(
379                    "Reading replication repository from {}",
380                    repo.repo_path.as_os_str().to_str().unwrap_or("...")
381                );
382                let proto_repo = ProtoReplicationRepo::decode(&mut Bytes::from(buf))
383                    .expect("Error decoding replication repository");
384                for item in proto_repo.replications {
385                    if let Err(err) = repo
386                        .create_replication(&item.name, item.settings.unwrap().into())
387                        .await
388                    {
389                        error!("Failed to load replication '{}': {}", item.name, err);
390                    }
391                }
392            }
393            Err(err) => {
394                warn!(
395                    "Failed to read replication repository from {}: {}",
396                    repo.repo_path.as_os_str().to_str().unwrap_or("..."),
397                    err
398                );
399            }
400        }
401        repo
402    }
403
404    async fn save_repo(&self) -> Result<(), ReductError> {
405        let replications = self.replications.read().await?;
406        let proto_repo = ProtoReplicationRepo {
407            replications: replications
408                .iter()
409                .map(|(name, replication)| Item {
410                    name: name.clone(),
411                    settings: Some(replication.settings().clone().into()),
412                })
413                .collect(),
414        };
415
416        let mut buf = Vec::new();
417        proto_repo
418            .encode(&mut buf)
419            .expect("Error encoding replication repository");
420
421        let mut file = FILE_CACHE
422            .write_or_create(&self.repo_path, Start(0))
423            .await?;
424        file.set_len(0)?;
425        file.write_all(&buf)?;
426        file.sync_all().await?;
427
428        Ok(())
429    }
430
431    async fn create_or_update_replication_task(
432        &mut self,
433        name: &str,
434        settings: ReplicationSettings,
435    ) -> Result<(), ReductError> {
436        // check if destination host is valid
437        let dest_url = match Url::parse(&settings.dst_host) {
438            Ok(url) => url,
439
440            Err(_) => {
441                return Err(unprocessable_entity!(
442                    "Invalid destination host '{}'",
443                    settings.dst_host
444                ))
445            }
446        };
447
448        // check if source bucket exists
449        if self.storage.get_bucket(&settings.src_bucket).await.is_err() {
450            return Err(not_found!(
451                "Source bucket '{}' for replication '{}' does not exist",
452                settings.src_bucket,
453                name
454            ));
455        }
456
457        // check if target and source buckets are the same
458        if settings.src_bucket == settings.dst_bucket
459            && self.config.replication_conf.listening_port
460                == dest_url.port_or_known_default().unwrap_or(DEFAULT_PORT)
461            && ["127.0.0.1", "localhost", "0.0.0.0"].contains(&dest_url.host_str().unwrap_or(""))
462        {
463            return Err(unprocessable_entity!(
464                "Source and destination buckets must be different",
465            ));
466        }
467
468        // check syntax of when condition
469        let mut conf = self.config.clone();
470        if let Some(when) = &settings.when {
471            let (cond, directives) = match Parser::new().parse(when.clone()) {
472                Ok((cond, dirs)) => (cond, dirs),
473                Err(err) => {
474                    return Err(unprocessable_entity!(
475                        "Invalid replication condition: {}",
476                        err.message
477                    ))
478                }
479            };
480
481            let filer = WhenFilter::<TransactionNotification>::try_new(
482                cond,
483                directives,
484                self.config.io_conf.clone(),
485                true,
486            )?;
487            conf.io_conf = filer.io_config().clone();
488        }
489
490        // remove old replication because before creating new one
491        let mut removed = self.replications.write().await?.remove(name);
492
493        // we keep the old token if the new one is empty (meaning not updated)
494        let init_token = settings.dst_token.clone().or_else(|| {
495            removed
496                .as_ref()
497                .and_then(|r| r.settings().dst_token.clone())
498        });
499
500        if let Some(mut old) = removed.take() {
501            old.stop().await;
502        }
503
504        let mut settings = settings;
505        settings.dst_token = init_token;
506
507        let replication =
508            ReplicationTask::new(name.to_string(), settings, conf, Arc::clone(&self.storage));
509        let mut replication = replication;
510        if self.started {
511            replication.start();
512        }
513        self.replications
514            .write()
515            .await?
516            .insert(name.to_string(), replication);
517        self.save_repo().await
518    }
519}
520
521impl ReplicationRepository {
522    pub(crate) fn start_all(&mut self) {
523        if self.started {
524            return;
525        }
526
527        if let Some(mut replications) = self.replications.try_write() {
528            for (_, task) in replications.iter_mut() {
529                task.start();
530            }
531        } else {
532            let replications = Arc::clone(&self.replications);
533            tokio::spawn(async move {
534                let mut replications = match replications.write().await {
535                    Ok(guard) => guard,
536                    Err(err) => {
537                        error!("Failed to lock replication map: {}", err);
538                        return;
539                    }
540                };
541                for (_, task) in replications.iter_mut() {
542                    task.start();
543                }
544            });
545        }
546        self.started = true;
547    }
548}
549
550#[cfg(test)]
551mod tests {
552    use super::*;
553    use crate::core::sync::{reset_rwlock_config, set_rwlock_timeout};
554    use crate::replication::Transaction::WriteRecord;
555    use reduct_base::msg::bucket_api::BucketSettings;
556    use reduct_base::{conflict, internal_server_error, not_found, Labels};
557    use rstest::*;
558    use serial_test::serial;
559    use tokio::time::{sleep, Duration};
560
561    mod create {
562        use super::*;
563        #[rstest]
564        #[tokio::test]
565        async fn create_replication(
566            #[future] mut repo: ReplicationRepository,
567            settings: ReplicationSettings,
568        ) {
569            let mut repo = repo.await;
570            repo.create_replication("test", settings.clone())
571                .await
572                .unwrap();
573
574            let repls = repo.replications().await.unwrap();
575            assert_eq!(repls.len(), 1);
576            assert_eq!(repls[0].name, "test");
577            assert_eq!(
578                repo.get_replication_settings("test").await.unwrap(),
579                settings,
580                "Should create replication with the same name and settings"
581            );
582        }
583
584        #[rstest]
585        #[tokio::test]
586        async fn create_replication_with_same_name(
587            #[future] mut repo: ReplicationRepository,
588            settings: ReplicationSettings,
589        ) {
590            let mut repo = repo.await;
591            repo.create_replication("test", settings.clone())
592                .await
593                .unwrap();
594
595            assert_eq!(
596                repo.create_replication("test", settings).await,
597                Err(conflict!("Replication 'test' already exists")),
598                "Should not create replication with the same name"
599            );
600        }
601
602        #[rstest]
603        #[tokio::test]
604        async fn create_replication_with_invalid_url(
605            #[future] mut repo: ReplicationRepository,
606            settings: ReplicationSettings,
607        ) {
608            let mut repo = repo.await;
609            let mut settings = settings;
610            settings.dst_host = "invalid_url".to_string();
611
612            assert_eq!(
613                repo.create_replication("test", settings).await,
614                Err(unprocessable_entity!(
615                    "Invalid destination host 'invalid_url'"
616                )),
617                "Should not create replication with invalid url"
618            );
619        }
620
621        #[rstest]
622        #[tokio::test]
623        async fn create_replication_to_same_bucket(
624            #[future] mut repo: ReplicationRepository,
625            settings: ReplicationSettings,
626        ) {
627            let mut repo = repo.await;
628            let mut settings = settings;
629            settings.dst_host = format!("http://localhost:{}", DEFAULT_PORT);
630            settings.dst_bucket = "bucket-1".to_string();
631
632            assert_eq!(
633                repo.create_replication("test", settings).await,
634                Err(unprocessable_entity!(
635                    "Source and destination buckets must be different"
636                )),
637                "Should not create replication to the same bucket"
638            );
639        }
640
641        #[rstest]
642        #[tokio::test]
643        async fn test_replication_src_bucket_not_found(
644            #[future] mut repo: ReplicationRepository,
645            mut settings: ReplicationSettings,
646        ) {
647            let mut repo = repo.await;
648            settings.src_bucket = "bucket-2".to_string();
649            assert_eq!(
650                repo.create_replication("test", settings).await,
651                Err(not_found!(
652                    "Source bucket 'bucket-2' for replication 'test' does not exist"
653                )),
654                "Should not create replication with non existing source bucket"
655            );
656        }
657
658        #[rstest]
659        #[tokio::test]
660        async fn test_replication_with_invalid_when_condition(
661            #[future] mut repo: ReplicationRepository,
662            settings: ReplicationSettings,
663        ) {
664            let mut repo = repo.await;
665            let mut settings = settings;
666            settings.when = Some(serde_json::json!({"$UNKNOWN_OP": ["&x", "y"]}));
667            assert_eq!(
668                repo.create_replication("test", settings).await,
669                Err(unprocessable_entity!(
670                    "Invalid replication condition: Operator '$UNKNOWN_OP' not supported"
671                )),
672                "Should not create replication with invalid when condition"
673            );
674        }
675
676        #[rstest]
677        #[tokio::test]
678        async fn create_and_load_replications(
679            #[future] storage: Arc<StorageEngine>,
680            settings: ReplicationSettings,
681        ) {
682            let storage = storage.await;
683            let mut repo =
684                ReplicationRepository::load_or_create(Arc::clone(&storage), Cfg::default()).await;
685            repo.create_replication("test", settings.clone())
686                .await
687                .unwrap();
688
689            let repo =
690                ReplicationRepository::load_or_create(Arc::clone(&storage), Cfg::default()).await;
691            assert_eq!(repo.replications().await.unwrap().len(), 1);
692            assert_eq!(
693                repo.get_replication_settings("test").await.unwrap(),
694                settings,
695                "Should load replication from file"
696            );
697        }
698    }
699
700    mod update {
701        use super::*;
702        #[rstest]
703        #[tokio::test]
704        async fn test_update_replication(
705            #[future] mut repo: ReplicationRepository,
706            settings: ReplicationSettings,
707        ) {
708            let mut repo = repo.await;
709            repo.create_replication("test", settings.clone())
710                .await
711                .unwrap();
712
713            let mut settings = settings;
714            settings.dst_bucket = "bucket-3".to_string();
715            repo.update_replication("test", settings.clone())
716                .await
717                .unwrap();
718
719            let replication = repo.get_replication_settings("test").await.unwrap();
720            assert_eq!(replication.dst_bucket, "bucket-3");
721        }
722
723        #[rstest]
724        #[tokio::test]
725        async fn test_update_provisioned_replication(
726            #[future] mut repo: ReplicationRepository,
727            settings: ReplicationSettings,
728        ) {
729            let mut repo = repo.await;
730            repo.create_replication("test", settings.clone())
731                .await
732                .unwrap();
733
734            repo.set_replication_provisioned("test", true)
735                .await
736                .unwrap();
737
738            assert_eq!(
739                repo.update_replication("test", settings).await,
740                Err(conflict!("Can't update provisioned replication 'test'")),
741                "Should not update provisioned replication"
742            );
743        }
744
745        #[rstest]
746        #[tokio::test]
747        async fn test_update_non_existing_replication(#[future] mut repo: ReplicationRepository) {
748            let mut repo = repo.await;
749            assert_eq!(
750                repo.update_replication("test-2", ReplicationSettings::default())
751                    .await,
752                Err(not_found!("Replication 'test-2' does not exist")),
753                "Should not update non existing replication"
754            );
755        }
756
757        #[rstest]
758        #[tokio::test]
759        async fn test_update_replication_with_invalid_url(
760            #[future] mut repo: ReplicationRepository,
761            settings: ReplicationSettings,
762        ) {
763            let mut repo = repo.await;
764            repo.create_replication("test", settings.clone())
765                .await
766                .unwrap();
767
768            let mut settings = settings;
769            settings.dst_host = "invalid_url".to_string();
770
771            assert_eq!(
772                repo.update_replication("test", settings).await,
773                Err(unprocessable_entity!(
774                    "Invalid destination host 'invalid_url'"
775                )),
776                "Should not update replication with invalid url"
777            );
778        }
779
780        #[rstest]
781        #[tokio::test]
782        async fn test_update_replication_to_same_bucket(
783            #[future] mut repo: ReplicationRepository,
784            settings: ReplicationSettings,
785        ) {
786            let mut repo = repo.await;
787            repo.create_replication("test", settings.clone())
788                .await
789                .unwrap();
790
791            let mut settings = settings;
792            settings.dst_host = format!("http://localhost:{}", DEFAULT_PORT);
793            settings.dst_bucket = "bucket-1".to_string();
794
795            assert_eq!(
796                repo.update_replication("test", settings).await,
797                Err(unprocessable_entity!(
798                    "Source and destination buckets must be different"
799                )),
800                "Should not update replication to the same bucket"
801            );
802        }
803
804        #[rstest]
805        #[tokio::test]
806        async fn test_update_replication_src_bucket_not_found(
807            #[future] mut repo: ReplicationRepository,
808            settings: ReplicationSettings,
809        ) {
810            let mut repo = repo.await;
811            repo.create_replication("test", settings.clone())
812                .await
813                .unwrap();
814
815            let mut settings = settings;
816            settings.src_bucket = "bucket-2".to_string();
817
818            assert_eq!(
819                repo.update_replication("test", settings).await,
820                Err(not_found!(
821                    "Source bucket 'bucket-2' for replication 'test' does not exist"
822                )),
823                "Should not update replication with non existing source bucket"
824            );
825        }
826
827        #[rstest]
828        #[tokio::test]
829        async fn test_remove_old_replication_only_for_valid(
830            #[future] mut repo: ReplicationRepository,
831            mut settings: ReplicationSettings,
832        ) {
833            let mut repo = repo.await;
834            repo.create_replication("test", settings.clone())
835                .await
836                .unwrap();
837            settings.when = Some(serde_json::json!({"$not-exist": [true, true]}));
838
839            let err = repo
840                .update_replication("test", settings)
841                .await
842                .err()
843                .unwrap();
844            assert_eq!(
845                err,
846                unprocessable_entity!(
847                    "Invalid replication condition: Operator '$not-exist' not supported"
848                ),
849                "Should not update replication with invalid when condition"
850            );
851
852            assert!(repo.get_info("test").await.is_ok(), "Was not removed");
853        }
854
855        #[rstest]
856        #[tokio::test]
857        async fn test_update_replication_keep_dst_token_if_not_set(
858            #[future] mut repo: ReplicationRepository,
859            settings: ReplicationSettings,
860        ) {
861            let mut repo = repo.await;
862            repo.create_replication("test", settings.clone())
863                .await
864                .unwrap();
865
866            let mut updated = settings;
867            updated.dst_bucket = "bucket-3".to_string();
868            updated.dst_token = None;
869            repo.update_replication("test", updated).await.unwrap();
870
871            let replication = repo.get_replication_settings("test").await.unwrap();
872            assert_eq!(replication.dst_bucket, "bucket-3");
873            assert_eq!(replication.dst_token, Some("token".to_string()));
874        }
875    }
876
877    mod remove {
878        use super::*;
879        #[rstest]
880        #[tokio::test]
881        async fn test_remove_replication(
882            #[future] mut repo: ReplicationRepository,
883            #[future] storage: Arc<StorageEngine>,
884            settings: ReplicationSettings,
885        ) {
886            let mut repo = repo.await;
887            let storage = storage.await;
888            repo.create_replication("test", settings.clone())
889                .await
890                .unwrap();
891
892            repo.remove_replication("test").await.unwrap();
893            assert_eq!(repo.replications().await.unwrap().len(), 0);
894
895            // check if replication is removed from file
896            let repo =
897                ReplicationRepository::load_or_create(Arc::clone(&storage), Cfg::default()).await;
898            assert_eq!(
899                repo.replications().await.unwrap().len(),
900                0,
901                "Should remove replication permanently"
902            );
903        }
904
905        #[rstest]
906        #[tokio::test]
907        async fn test_remove_non_existing_replication(#[future] mut repo: ReplicationRepository) {
908            let mut repo = repo.await;
909            assert_eq!(
910                repo.remove_replication("test-2").await,
911                Err(not_found!("Replication 'test-2' does not exist")),
912                "Should not remove non existing replication"
913            );
914        }
915
916        #[rstest]
917        #[tokio::test]
918        async fn test_remove_provisioned_replication(
919            #[future] mut repo: ReplicationRepository,
920            settings: ReplicationSettings,
921        ) {
922            let mut repo = repo.await;
923            repo.create_replication("test", settings.clone())
924                .await
925                .unwrap();
926
927            repo.set_replication_provisioned("test", true)
928                .await
929                .unwrap();
930
931            assert_eq!(
932                repo.remove_replication("test").await,
933                Err(conflict!("Can't remove provisioned replication 'test'")),
934                "Should not remove provisioned replication"
935            );
936        }
937    }
938
939    mod get {
940        use super::*;
941        use reduct_base::io::RecordMeta;
942
943        #[rstest]
944        #[tokio::test]
945        async fn test_get_replication(
946            #[future] mut repo: ReplicationRepository,
947            settings: ReplicationSettings,
948        ) {
949            let mut repo = repo.await;
950            repo.create_replication("test", settings.clone())
951                .await
952                .unwrap();
953            {
954                repo.notify(TransactionNotification {
955                    bucket: "bucket-1".to_string(),
956                    entry: "entry-1".to_string(),
957                    meta: RecordMeta::builder().build(),
958                    event: WriteRecord(0),
959                })
960                .await
961                .unwrap();
962                sleep(Duration::from_millis(100)).await;
963            }
964
965            let info = repo.get_info("test").await.unwrap();
966            assert_eq!(info.settings.src_bucket, settings.src_bucket);
967            assert_eq!(info.info.name, "test");
968        }
969
970        #[rstest]
971        #[tokio::test]
972        async fn test_get_non_existing_replication(#[future] repo: ReplicationRepository) {
973            let repo = repo.await;
974            assert_eq!(
975                repo.get_info("test-2").await.err(),
976                Some(not_found!("Replication 'test-2' does not exist")),
977                "Should not get non existing replication"
978            );
979            assert_eq!(
980                repo.get_replication_settings("test-2").await.err(),
981                Some(not_found!("Replication 'test-2' does not exist")),
982                "Should not get settings for non existing replication"
983            );
984            assert_eq!(
985                repo.is_replication_running("test-2").await.err(),
986                Some(not_found!("Replication 'test-2' does not exist")),
987                "Should not get running state for non existing replication"
988            );
989        }
990
991        #[rstest]
992        #[tokio::test]
993        async fn test_get_mut_non_existing_replication(#[future] mut repo: ReplicationRepository) {
994            let mut repo = repo.await;
995            assert_eq!(
996                repo.set_replication_provisioned("test-2", true).await.err(),
997                Some(not_found!("Replication 'test-2' does not exist")),
998                "Should not get non existing replication"
999            );
1000        }
1001    }
1002
1003    mod notify {
1004        use super::*;
1005        use reduct_base::io::RecordMeta;
1006        use tokio::time::{sleep, Duration};
1007
1008        #[rstest]
1009        #[tokio::test]
1010        async fn test_notify_replication(
1011            #[future] mut repo: ReplicationRepository,
1012            settings: ReplicationSettings,
1013        ) {
1014            let mut repo = repo.await;
1015            repo.create_replication("test", settings.clone())
1016                .await
1017                .unwrap();
1018
1019            let notification = TransactionNotification {
1020                bucket: "bucket-1".to_string(),
1021                entry: "entry-1".to_string(),
1022                meta: RecordMeta::builder().build(),
1023                event: WriteRecord(0),
1024            };
1025
1026            repo.notify(notification.clone()).await.unwrap();
1027            sleep(Duration::from_millis(50)).await;
1028            assert_eq!(repo.get_info("test").await.unwrap().info.pending_records, 1);
1029        }
1030
1031        #[rstest]
1032        #[tokio::test]
1033        async fn test_notify_wrong_bucket(
1034            #[future] mut repo: ReplicationRepository,
1035            settings: ReplicationSettings,
1036        ) {
1037            let mut repo = repo.await;
1038            repo.create_replication("test", settings.clone())
1039                .await
1040                .unwrap();
1041
1042            let notification = TransactionNotification {
1043                bucket: "bucket-2".to_string(),
1044                entry: "entry-1".to_string(),
1045                meta: RecordMeta::builder().build(),
1046                event: WriteRecord(0),
1047            };
1048
1049            repo.notify(notification).await.unwrap();
1050            sleep(Duration::from_millis(50)).await;
1051            assert_eq!(
1052                repo.get_info("test").await.unwrap().info.pending_records,
1053                0,
1054                "Should not notify replication for wrong bucket"
1055            );
1056        }
1057
1058        #[rstest]
1059        #[tokio::test]
1060        async fn test_notify_after_stop_returns_error(
1061            #[future] mut repo: ReplicationRepository,
1062            settings: ReplicationSettings,
1063        ) {
1064            let mut repo = repo.await;
1065            repo.create_replication("test", settings).await.unwrap();
1066            repo.stop().await;
1067
1068            let notification = TransactionNotification {
1069                bucket: "bucket-1".to_string(),
1070                entry: "entry-1".to_string(),
1071                meta: RecordMeta::builder().build(),
1072                event: WriteRecord(0),
1073            };
1074
1075            assert_eq!(
1076                repo.notify(notification).await.err(),
1077                Some(internal_server_error!(
1078                    "Failed to enqueue replication notification"
1079                ))
1080            );
1081        }
1082
1083        #[rstest]
1084        #[tokio::test]
1085        async fn test_notify_skips_non_matching_replications(
1086            #[future] mut repo: ReplicationRepository,
1087            settings: ReplicationSettings,
1088        ) {
1089            let mut repo = repo.await;
1090            let mut settings_1 = settings.clone();
1091            settings_1.src_bucket = "bucket-1".to_string();
1092            repo.create_replication("test-1", settings_1).await.unwrap();
1093
1094            repo.storage
1095                .create_bucket("bucket-2", BucketSettings::default())
1096                .await
1097                .unwrap();
1098            let mut settings_2 = settings;
1099            settings_2.src_bucket = "bucket-2".to_string();
1100            settings_2.dst_bucket = "bucket-1".to_string();
1101            repo.create_replication("test-2", settings_2).await.unwrap();
1102
1103            let notification = TransactionNotification {
1104                bucket: "bucket-1".to_string(),
1105                entry: "entry-1".to_string(),
1106                meta: RecordMeta::builder().build(),
1107                event: WriteRecord(0),
1108            };
1109
1110            repo.notify(notification).await.unwrap();
1111            sleep(Duration::from_millis(50)).await;
1112            assert_eq!(
1113                repo.get_info("test-1").await.unwrap().info.pending_records,
1114                1
1115            );
1116            assert_eq!(
1117                repo.get_info("test-2").await.unwrap().info.pending_records,
1118                0
1119            );
1120        }
1121
1122        #[rstest]
1123        #[tokio::test]
1124        #[serial]
1125        async fn test_notify_worker_lock_timeout(
1126            #[future] mut repo: ReplicationRepository,
1127            settings: ReplicationSettings,
1128        ) {
1129            struct ResetGuard;
1130            impl Drop for ResetGuard {
1131                fn drop(&mut self) {
1132                    reset_rwlock_config();
1133                }
1134            }
1135            let _reset = ResetGuard;
1136
1137            let mut repo = repo.await;
1138            repo.create_replication("test", settings).await.unwrap();
1139
1140            set_rwlock_timeout(Duration::from_millis(20));
1141            let replications = Arc::clone(&repo.replications);
1142            let guard = replications.read().await.unwrap();
1143
1144            let notification = TransactionNotification {
1145                bucket: "bucket-1".to_string(),
1146                entry: "entry-1".to_string(),
1147                meta: RecordMeta::builder().build(),
1148                event: WriteRecord(0),
1149            };
1150            repo.notify(notification).await.unwrap();
1151            sleep(Duration::from_millis(50)).await;
1152            drop(guard);
1153
1154            assert_eq!(repo.get_info("test").await.unwrap().info.pending_records, 0);
1155        }
1156    }
1157
1158    mod start {
1159        use super::*;
1160
1161        #[rstest]
1162        #[tokio::test]
1163        async fn test_start_all(
1164            #[future] mut repo: ReplicationRepository,
1165            settings: ReplicationSettings,
1166        ) {
1167            let mut repo = repo.await;
1168            repo.create_replication("test-1", settings.clone())
1169                .await
1170                .unwrap();
1171            repo.create_replication("test-2", settings.clone())
1172                .await
1173                .unwrap();
1174
1175            repo.start();
1176            assert!(
1177                repo.is_replication_running("test-1").await.unwrap(),
1178                "Replication 'test-1' should be running"
1179            );
1180            assert!(
1181                repo.is_replication_running("test-2").await.unwrap(),
1182                "Replication 'test-2' should be running"
1183            );
1184        }
1185
1186        #[rstest]
1187        #[tokio::test]
1188        async fn test_double_start(
1189            #[future] mut repo: ReplicationRepository,
1190            settings: ReplicationSettings,
1191        ) {
1192            let mut repo = repo.await;
1193            repo.create_replication("test-1", settings.clone())
1194                .await
1195                .unwrap();
1196
1197            repo.start();
1198            repo.start(); // second start should have no effect
1199
1200            assert!(
1201                repo.is_replication_running("test-1").await.unwrap(),
1202                "Replication 'test-1' should be running"
1203            );
1204        }
1205
1206        #[rstest]
1207        #[tokio::test]
1208        async fn test_start_all_when_lock_contended(
1209            #[future] mut repo: ReplicationRepository,
1210            settings: ReplicationSettings,
1211        ) {
1212            let mut repo = repo.await;
1213            repo.create_replication("test-1", settings.clone())
1214                .await
1215                .unwrap();
1216            repo.create_replication("test-2", settings).await.unwrap();
1217
1218            let replications = Arc::clone(&repo.replications);
1219            let guard = replications.write().await.unwrap();
1220            repo.start();
1221            drop(guard);
1222            sleep(Duration::from_millis(50)).await;
1223
1224            assert!(repo.is_replication_running("test-1").await.unwrap());
1225            assert!(repo.is_replication_running("test-2").await.unwrap());
1226        }
1227
1228        #[rstest]
1229        #[tokio::test]
1230        #[serial]
1231        async fn test_start_all_lock_timeout(
1232            #[future] mut repo: ReplicationRepository,
1233            settings: ReplicationSettings,
1234        ) {
1235            struct ResetGuard;
1236            impl Drop for ResetGuard {
1237                fn drop(&mut self) {
1238                    reset_rwlock_config();
1239                }
1240            }
1241            let _reset = ResetGuard;
1242
1243            let mut repo = repo.await;
1244            repo.create_replication("test-1", settings).await.unwrap();
1245
1246            set_rwlock_timeout(Duration::from_millis(20));
1247            let replications = Arc::clone(&repo.replications);
1248            let guard = replications.read().await.unwrap();
1249            repo.start();
1250            sleep(Duration::from_millis(50)).await;
1251            drop(guard);
1252
1253            assert!(!repo.is_replication_running("test-1").await.unwrap());
1254        }
1255    }
1256
1257    mod set_mode {
1258        use super::*;
1259
1260        #[rstest]
1261        #[tokio::test]
1262        async fn test_set_mode(
1263            #[future] mut repo: ReplicationRepository,
1264            settings: ReplicationSettings,
1265        ) {
1266            let mut repo = repo.await;
1267            repo.create_replication("test-1", settings.clone())
1268                .await
1269                .unwrap();
1270            repo.set_mode("test-1", ReplicationMode::Paused)
1271                .await
1272                .unwrap();
1273
1274            assert_eq!(
1275                repo.get_info("test-1").await.unwrap().info.mode,
1276                ReplicationMode::Paused
1277            );
1278        }
1279
1280        #[rstest]
1281        #[tokio::test]
1282        async fn test_set_mode_non_existing(#[future] mut repo: ReplicationRepository) {
1283            let mut repo = repo.await;
1284            assert_eq!(
1285                repo.set_mode("test-1", ReplicationMode::Paused).await,
1286                Err(not_found!("Replication 'test-1' does not exist"))
1287            );
1288        }
1289    }
1290
1291    #[fixture]
1292    fn settings() -> ReplicationSettings {
1293        ReplicationSettings {
1294            src_bucket: "bucket-1".to_string(),
1295            dst_bucket: "bucket-2".to_string(),
1296            dst_host: "http://localhost".to_string(),
1297            dst_token: Some("token".to_string()),
1298            entries: vec!["entry-1".to_string()],
1299            include: Labels::default(),
1300            exclude: Labels::default(),
1301            each_n: None,
1302            each_s: None,
1303            when: None,
1304            mode: ReplicationMode::Enabled,
1305        }
1306    }
1307
1308    #[fixture]
1309    async fn storage() -> Arc<StorageEngine> {
1310        let tmp_dir = tempfile::tempdir().unwrap();
1311        let cfg = Cfg {
1312            data_path: tmp_dir.keep(),
1313            ..Cfg::default()
1314        };
1315        let storage = StorageEngine::builder()
1316            .with_data_path(cfg.data_path.clone())
1317            .with_cfg(cfg)
1318            .build()
1319            .await;
1320
1321        let bucket = storage
1322            .create_bucket("bucket-1", BucketSettings::default())
1323            .await
1324            .unwrap()
1325            .upgrade_and_unwrap();
1326        let _ = bucket.get_or_create_entry("entry-1").await.unwrap();
1327        Arc::new(storage)
1328    }
1329
1330    #[fixture]
1331    async fn repo(#[future] storage: Arc<StorageEngine>) -> ReplicationRepository {
1332        let storage = storage.await;
1333        ReplicationRepository::load_or_create(storage, Cfg::default()).await
1334    }
1335}