paged_and_ws/
paged_and_ws.rs1use circles_rpc::CirclesRpc;
9use circles_rpc::Result;
10use circles_types::{Address, PagedQueryParams, SortOrder};
11use futures::StreamExt;
12use serde::Deserialize;
13use std::time::Duration;
14
15const DEFAULT_RPC_URL: &str = "https://rpc.aboutcircles.com/";
16const DEFAULT_WS_URL: &str = "wss://rpc.aboutcircles.com/ws";
17const SAMPLE_AVATAR: &str = "0xde374ece6fa50e781e81aac78e811b33d16912c7";
19
20#[derive(Debug, Deserialize, serde::Serialize, Clone)]
21struct AvatarRow {
22 avatar: Address,
23 timestamp: u64,
24}
25
26#[tokio::main]
27async fn main() -> Result<()> {
28 let rpc_url = std::env::var("CIRCLES_RPC_URL").unwrap_or_else(|_| DEFAULT_RPC_URL.to_string());
29 let rpc = CirclesRpc::try_from(rpc_url.as_str())?;
30
31 let avatar: Address = SAMPLE_AVATAR.parse().expect("valid sample address");
32
33 let params = PagedQueryParams {
35 namespace: "V_Crc".to_string(),
36 table: "Avatars".to_string(),
37 sort_order: SortOrder::DESC,
38 columns: vec!["avatar".into(), "timestamp".into()],
39 filter: None,
40 cursor_columns: None,
41 order_columns: None,
42 limit: 10,
43 };
44
45 let mut pager = rpc.paged_query::<AvatarRow>(params);
46 if let Some(page) = pager.next_page().await? {
47 println!(
48 "Fetched {} avatar rows (has_more={})",
49 page.items.len(),
50 page.has_more
51 );
52 for row in page.items {
53 println!("avatar {} @ {}", row.avatar, row.timestamp);
54 }
55 } else {
56 println!("No rows returned");
57 }
58
59 #[cfg(feature = "ws")]
61 {
62 let ws_url =
63 std::env::var("CIRCLES_RPC_WS_URL").unwrap_or_else(|_| DEFAULT_WS_URL.to_string());
64 match CirclesRpc::try_from_ws(ws_url.as_str()).await {
65 Ok(rpc_ws) => stream_events(&rpc_ws, avatar).await?,
66 Err(e) => println!("WebSocket connect failed, skipping subscription: {e}"),
67 }
68 }
69 #[cfg(not(feature = "ws"))]
70 {
71 println!("WebSocket example skipped; enable the `ws` feature to run it.");
72 }
73
74 Ok(())
75}
76
77#[cfg(feature = "ws")]
78async fn stream_events(rpc: &CirclesRpc, address: Address) -> Result<()> {
79 use tokio::time::timeout;
80
81 println!("Subscribing to Circles events for address {address}");
82 let filter = serde_json::json!({});
85 let mut sub = match rpc.events().subscribe_parsed_events(filter).await {
86 Ok(sub) => sub,
87 Err(e) => {
88 eprintln!("subscription failed: {e}");
89 return Ok(());
90 }
91 };
92
93 let mut seen = 0u32;
94 loop {
95 match timeout(Duration::from_secs(15), sub.next()).await {
96 Ok(Some(Ok(evt))) => {
97 println!(
98 "event: {:?} @ block {}",
99 evt.event_type, evt.base.block_number
100 );
101 seen += 1;
102 if seen >= 3 {
103 break;
104 }
105 }
106 Ok(Some(Err(e))) => {
107 println!("event stream error: {e}");
108 break;
109 }
110 Ok(None) => {
111 println!("subscription closed");
112 break;
113 }
114 Err(_) => {
115 println!("no events within timeout, stopping");
116 break;
117 }
118 }
119 }
120
121 println!("Unsubscribing after {seen} events");
122 let _ = sub.unsubscribe();
124
125 debug_raw_ws(address).await;
127 Ok(())
128}
129
130#[cfg(feature = "ws")]
131async fn debug_raw_ws(address: Address) {
132 use alloy_provider::{Identity, Provider, ProviderBuilder};
133 use alloy_transport_ws::WsConnect;
134 use futures::StreamExt;
135 use serde_json::Value;
136
137 let ws_url = std::env::var("CIRCLES_RPC_WS_URL").unwrap_or_else(|_| DEFAULT_WS_URL.to_string());
138 println!("Debugging raw WS frames from {ws_url}");
139
140 let provider: alloy_provider::RootProvider =
141 match ProviderBuilder::<Identity, Identity>::default()
142 .connect_ws(WsConnect::new(ws_url.clone()))
143 .await
144 {
145 Ok(p) => p,
146 Err(e) => {
147 eprintln!("raw ws connect failed: {e}");
148 return;
149 }
150 };
151
152 let filter = serde_json::json!({ "address": address });
153 let sub = provider.subscribe::<_, Value>(("circles", filter));
154 match circles_rpc::EventStream::from_subscription(sub).await {
155 Ok((mut stream, _id)) => {
156 let mut count = 0u8;
157 while let Some(msg) = stream.next().await {
158 println!("raw ws message: {msg:?}");
159 count += 1;
160 if count >= 3 {
161 break;
162 }
163 }
164 }
165 Err(e) => eprintln!("failed to build raw stream: {e}"),
166 }
167}