Skip to main content

paged_and_ws/
paged_and_ws.rs

1//! Minimal example hitting the public Circles RPC:
2//! - paged `circles_query` over V_Crc.Avatars
3//! - optional websocket subscription for Circles events
4//!
5//! Run with:
6//! `CIRCLES_RPC_URL=https://rpc.aboutcircles.com/ cargo run -p circles-rpc --example paged_and_ws --features ws`
7
8use circles_rpc::CirclesRpc;
9use circles_rpc::Result;
10use circles_types::{Address, PagedQueryParams, SortOrder};
11use futures::StreamExt;
12use serde::Deserialize;
13use std::time::Duration;
14
15const DEFAULT_RPC_URL: &str = "https://rpc.aboutcircles.com/";
16const DEFAULT_WS_URL: &str = "wss://rpc.aboutcircles.com/ws";
17// Sample address from docs; replace with any avatar you care about.
18const SAMPLE_AVATAR: &str = "0xde374ece6fa50e781e81aac78e811b33d16912c7";
19
20#[derive(Debug, Deserialize, serde::Serialize, Clone)]
21struct AvatarRow {
22    avatar: Address,
23    timestamp: u64,
24}
25
26#[tokio::main]
27async fn main() -> Result<()> {
28    let rpc_url = std::env::var("CIRCLES_RPC_URL").unwrap_or_else(|_| DEFAULT_RPC_URL.to_string());
29    let rpc = CirclesRpc::try_from(rpc_url.as_str())?;
30
31    let avatar: Address = SAMPLE_AVATAR.parse().expect("valid sample address");
32
33    // --- Paged query example (first page) ---
34    let params = PagedQueryParams {
35        namespace: "V_Crc".to_string(),
36        table: "Avatars".to_string(),
37        sort_order: SortOrder::DESC,
38        columns: vec!["avatar".into(), "timestamp".into()],
39        filter: None,
40        cursor_columns: None,
41        order_columns: None,
42        limit: 10,
43    };
44
45    let mut pager = rpc.paged_query::<AvatarRow>(params);
46    if let Some(page) = pager.next_page().await? {
47        println!(
48            "Fetched {} avatar rows (has_more={})",
49            page.items.len(),
50            page.has_more
51        );
52        for row in page.items {
53            println!("avatar {} @ {}", row.avatar, row.timestamp);
54        }
55    } else {
56        println!("No rows returned");
57    }
58
59    // --- WebSocket events example (requires `ws` feature) ---
60    #[cfg(feature = "ws")]
61    {
62        let ws_url =
63            std::env::var("CIRCLES_RPC_WS_URL").unwrap_or_else(|_| DEFAULT_WS_URL.to_string());
64        match CirclesRpc::try_from_ws(ws_url.as_str()).await {
65            Ok(rpc_ws) => stream_events(&rpc_ws, avatar).await?,
66            Err(e) => println!("WebSocket connect failed, skipping subscription: {e}"),
67        }
68    }
69    #[cfg(not(feature = "ws"))]
70    {
71        println!("WebSocket example skipped; enable the `ws` feature to run it.");
72    }
73
74    Ok(())
75}
76
77#[cfg(feature = "ws")]
78async fn stream_events(rpc: &CirclesRpc, address: Address) -> Result<()> {
79    use tokio::time::timeout;
80
81    println!("Subscribing to Circles events for address {address}");
82    // Empty filter = firehose; add {"address": <addr>} to narrow. Some public WS
83    // endpoints currently emit empty [] heartbeats and no payloads.
84    let filter = serde_json::json!({});
85    let mut sub = match rpc.events().subscribe_parsed_events(filter).await {
86        Ok(sub) => sub,
87        Err(e) => {
88            eprintln!("subscription failed: {e}");
89            return Ok(());
90        }
91    };
92
93    let mut seen = 0u32;
94    loop {
95        match timeout(Duration::from_secs(15), sub.next()).await {
96            Ok(Some(Ok(evt))) => {
97                println!(
98                    "event: {:?} @ block {}",
99                    evt.event_type, evt.base.block_number
100                );
101                seen += 1;
102                if seen >= 3 {
103                    break;
104                }
105            }
106            Ok(Some(Err(e))) => {
107                println!("event stream error: {e}");
108                break;
109            }
110            Ok(None) => {
111                println!("subscription closed");
112                break;
113            }
114            Err(_) => {
115                println!("no events within timeout, stopping");
116                break;
117            }
118        }
119    }
120
121    println!("Unsubscribing after {seen} events");
122    // Drop will best-effort eth_unsubscribe, but explicit is fine too.
123    let _ = sub.unsubscribe();
124
125    // Debug helper: also try a raw Value subscription to inspect payloads when parsing fails.
126    debug_raw_ws(address).await;
127    Ok(())
128}
129
130#[cfg(feature = "ws")]
131async fn debug_raw_ws(address: Address) {
132    use alloy_provider::{Identity, Provider, ProviderBuilder};
133    use alloy_transport_ws::WsConnect;
134    use futures::StreamExt;
135    use serde_json::Value;
136
137    let ws_url = std::env::var("CIRCLES_RPC_WS_URL").unwrap_or_else(|_| DEFAULT_WS_URL.to_string());
138    println!("Debugging raw WS frames from {ws_url}");
139
140    let provider: alloy_provider::RootProvider =
141        match ProviderBuilder::<Identity, Identity>::default()
142            .connect_ws(WsConnect::new(ws_url.clone()))
143            .await
144        {
145            Ok(p) => p,
146            Err(e) => {
147                eprintln!("raw ws connect failed: {e}");
148                return;
149            }
150        };
151
152    let filter = serde_json::json!({ "address": address });
153    let sub = provider.subscribe::<_, Value>(("circles", filter));
154    match circles_rpc::EventStream::from_subscription(sub).await {
155        Ok((mut stream, _id)) => {
156            let mut count = 0u8;
157            while let Some(msg) = stream.next().await {
158                println!("raw ws message: {msg:?}");
159                count += 1;
160                if count >= 3 {
161                    break;
162                }
163            }
164        }
165        Err(e) => eprintln!("failed to build raw stream: {e}"),
166    }
167}