Skip to main content

shilp_sdk/
oplog.rs

1use crate::client::Client;
2use crate::error::Result;
3use crate::models::{
4    GenericResponse, GetOplogResponse, OplogStatusResponse, RegisterReplicaRequest,
5    UnRegisterReplicaRequest, UpdateReplicaLSNRequest, UpdateReplicaLSNResponse,
6};
7use std::collections::HashMap;
8
9impl Client {
10    /// Retrieves oplog entries after a specific LSN for replica synchronization
11    /// Parameters:
12    /// - collection: Collection name to retrieve oplog entries for (optional, if empty returns entries for all collections)
13    /// - after_lsn: LSN after which to retrieve oplog entries (required)
14    /// - limit: Maximum number of oplog entries to retrieve (optional)
15    pub async fn get_oplog_entries(
16        &self,
17        collection: Option<&str>,
18        after_lsn: u64,
19        limit: Option<i32>,
20    ) -> Result<GetOplogResponse> {
21        let mut params = HashMap::new();
22        params.insert("after_lsn".to_string(), after_lsn.to_string());
23
24        if let Some(coll) = collection {
25            params.insert("collection".to_string(), coll.to_string());
26        }
27
28        if let Some(lim) = limit {
29            params.insert("limit".to_string(), lim.to_string());
30        }
31
32        self.do_request::<GetOplogResponse, ()>(
33            reqwest::Method::GET,
34            "/api/oplog/v1/",
35            None,
36            Some(&params),
37        )
38        .await
39    }
40
41    /// Updates the last applied LSN for a replica (heartbeat)
42    /// Parameters:
43    /// - collection: Collection name
44    /// - replica_id: Replica identifier
45    /// - lsn: Last applied LSN
46    pub async fn update_replica_lsn(
47        &self,
48        collection: &str,
49        replica_id: &str,
50        lsn: u64,
51    ) -> Result<UpdateReplicaLSNResponse> {
52        let req = UpdateReplicaLSNRequest {
53            collection: collection.to_string(),
54            replica_id: replica_id.to_string(),
55            lsn,
56        };
57
58        self.do_request(
59            reqwest::Method::POST,
60            "/api/oplog/v1/heartbeat",
61            Some(&req),
62            None,
63        )
64        .await
65    }
66
67    /// Registers a replica for oplog retention tracking
68    /// Parameters:
69    /// - replica_id: Replica identifier
70    pub async fn register_replica(&self, replica_id: &str) -> Result<GenericResponse> {
71        let req = RegisterReplicaRequest {
72            replica_id: replica_id.to_string(),
73        };
74
75        self.do_request(
76            reqwest::Method::POST,
77            "/api/oplog/v1/register",
78            Some(&req),
79            None,
80        )
81        .await
82    }
83
84    /// Unregisters a replica for oplog retention tracking
85    /// Parameters:
86    /// - replica_id: Replica identifier
87    pub async fn unregister_replica(&self, replica_id: &str) -> Result<GenericResponse> {
88        let req = UnRegisterReplicaRequest {
89            replica_id: replica_id.to_string(),
90        };
91
92        self.do_request(
93            reqwest::Method::POST,
94            "/api/oplog/v1/unregister",
95            Some(&req),
96            None,
97        )
98        .await
99    }
100
101    /// Retrieves current oplog status and statistics for a collection
102    /// Parameters:
103    /// - collection: Collection name to retrieve oplog status for (required)
104    pub async fn get_oplog_status(&self, collection: &str) -> Result<OplogStatusResponse> {
105        let mut params = HashMap::new();
106        params.insert("collection".to_string(), collection.to_string());
107
108        self.do_request::<OplogStatusResponse, ()>(
109            reqwest::Method::GET,
110            "/api/oplog/v1/status",
111            None,
112            Some(&params),
113        )
114        .await
115    }
116}