Skip to main content

synap_sdk/
hyperloglog.rs

1//! HyperLogLog operations (PFADD/PFCOUNT/PFMERGE)
2
3use crate::client::SynapClient;
4use crate::error::Result;
5use crate::types::HyperLogLogStats;
6use serde_json::json;
7
8#[derive(Clone)]
9pub struct HyperLogLogManager {
10    client: SynapClient,
11}
12
13impl HyperLogLogManager {
14    pub(crate) fn new(client: SynapClient) -> Self {
15        Self { client }
16    }
17
18    /// Add elements to a HyperLogLog structure (PFADD)
19    pub async fn pfadd<I, T>(&self, key: &str, elements: I) -> Result<usize>
20    where
21        I: IntoIterator<Item = T>,
22        T: AsRef<[u8]>,
23    {
24        let encoded: Vec<Vec<u8>> = elements
25            .into_iter()
26            .map(|el| el.as_ref().to_vec())
27            .collect();
28        if encoded.is_empty() {
29            return Ok(0);
30        }
31
32        let payload = json!({
33            "key": key,
34            "elements": encoded,
35        });
36
37        let response = self
38            .client
39            .send_command("hyperloglog.pfadd", payload)
40            .await?;
41        Ok(response["added"].as_u64().unwrap_or(0) as usize)
42    }
43
44    /// Estimate cardinality of a HyperLogLog structure (PFCOUNT)
45    pub async fn pfcount(&self, key: &str) -> Result<u64> {
46        let payload = json!({"key": key});
47        let response = self
48            .client
49            .send_command("hyperloglog.pfcount", payload)
50            .await?;
51        Ok(response["count"].as_u64().unwrap_or(0))
52    }
53
54    /// Merge multiple HyperLogLog structures into destination (PFMERGE)
55    pub async fn pfmerge<S>(&self, destination: &str, sources: &[S]) -> Result<u64>
56    where
57        S: AsRef<str>,
58    {
59        let payload = json!({
60            "destination": destination,
61            "sources": sources.iter().map(|s| s.as_ref()).collect::<Vec<_>>(),
62        });
63        let response = self
64            .client
65            .send_command("hyperloglog.pfmerge", payload)
66            .await?;
67        Ok(response["count"].as_u64().unwrap_or(0))
68    }
69
70    /// Retrieve HyperLogLog statistics
71    pub async fn stats(&self) -> Result<HyperLogLogStats> {
72        let response = self
73            .client
74            .send_command("hyperloglog.stats", json!({}))
75            .await?;
76        let stats: HyperLogLogStats = serde_json::from_value(response)?;
77        Ok(stats)
78    }
79}