operator_nodes_client/
lib.rs1mod handlers;
2use handlers::router::new_router;
3use tokio::net::TcpListener;
4use tokio::task;
5use solana_client::rpc_client::RpcClient;
6use solana_client::rpc_config::RpcTransactionConfig;
7use solana_sdk::commitment_config::CommitmentConfig;
8use solana_sdk::pubkey::Pubkey;
9use solana_transaction_status_client_types::option_serializer::OptionSerializer;
10use base64::{engine::general_purpose, Engine as _};
11
12fn format_log_line(line: &str) -> String {
13 if let Some(rest) = line.strip_prefix("Program data: ") {
14 return format!("[program data omitted: {} bytes base64]", rest.len());
15 }
16 if let Some(rest) = line.strip_prefix("Program log: ") {
17 return rest.to_string();
18 }
19 if line.contains("BPF program ") || line.contains("Program invoke") || line.contains("Program returned") {
20 return String::new();
21 }
22 line.to_string()
23}
24
25fn extract_number(s: &str) -> Option<u64> {
26 let mut digits = String::new();
27 for ch in s.chars() {
28 if ch.is_ascii_digit() {
29 digits.push(ch);
30 } else if !digits.is_empty() {
31 break;
32 }
33 }
34 digits.parse::<u64>().ok()
35}
36
37fn parse_and_notify(log_lines: &[String], debug: bool) {
38 let mut saw_event = false;
39 let mut operator_owner: Option<String> = None;
40 let mut slashed_amount: Option<u64> = None;
41 let mut remaining_bond: Option<u64> = None;
42
43 fn parse_event_from_program_data(b64: &str) -> Option<(String, u64, u64)> {
44 let bytes = general_purpose::STANDARD.decode(b64).ok()?;
45 if bytes.len() < 56 { return None; }
46 let data = &bytes[8..];
47 if data.len() < 48 { return None; }
48 let mut owner_arr = [0u8; 32];
49 owner_arr.copy_from_slice(&data[0..32]);
50 let owner = Pubkey::new_from_array(owner_arr).to_string();
51 let slashed = u64::from_le_bytes(data[32..40].try_into().ok()?);
52 let remaining = u64::from_le_bytes(data[40..48].try_into().ok()?);
53 Some((owner, slashed, remaining))
54 }
55
56 for line in log_lines.iter() {
57 if line.contains("OperatorSlashedEvent") { saw_event = true; }
58 if line.contains("Instruction: SlashOperator") || line.contains("Operator wrong! Slashing via CPI") { saw_event = true; }
59 if line.contains("Operator slashed") {
60 if let Some(idx) = line.find("Operator slashed") {
61 if let Some(num) = extract_number(&line[idx..]) { slashed_amount = Some(num); }
62 }
63 }
64 if line.contains("slashed_amount") || line.contains("remaining_bond") || line.contains("operator_owner") {
65 if let Some(idx) = line.find("slashed_amount:") {
66 if let Some(num) = extract_number(&line[idx + "slashed_amount:".len()..]) { slashed_amount = Some(num); }
67 }
68 if let Some(idx) = line.find("remaining_bond:") {
69 if let Some(num) = extract_number(&line[idx + "remaining_bond:".len()..]) { remaining_bond = Some(num); }
70 }
71 if let Some(idx) = line.find("operator_owner:") {
72 let tail = line[idx + "operator_owner:".len()..].trim();
73 operator_owner = tail
74 .split_whitespace()
75 .next()
76 .map(|s| s.trim_matches(&[',', '}'][..]).to_string())
77 .map(|mut s| {
78 if let Some(stripped) = s.strip_prefix("Some(").and_then(|x| x.strip_suffix(')')) { s = stripped.to_string(); }
79 s
80 });
81 }
82 }
83 if let Some(b64) = line.strip_prefix("Program data: ") {
84 if let Some((owner, slash, remain)) = parse_event_from_program_data(b64.trim()) {
85 saw_event = true;
86 if operator_owner.is_none() { operator_owner = Some(owner); }
87 if slashed_amount.is_none() { slashed_amount = Some(slash); }
88 if remaining_bond.is_none() { remaining_bond = Some(remain); }
89 }
90 }
91 }
92
93 if saw_event {
94 let owner_disp = operator_owner
95 .or_else(|| std::env::var("OPERATOR_OWNER").ok())
96 .unwrap_or_else(|| "unknown".to_string());
97 let slashed_disp = slashed_amount.map(|v| v.to_string()).unwrap_or_else(|| "unknown".to_string());
98 let remaining_disp = remaining_bond.map(|v| v.to_string()).unwrap_or_else(|| "unknown".to_string());
99 println!(
100 "[NOTIFY] OperatorSlashedEvent: operator_owner={}, slashed_amount={}, remaining_bond={}",
101 owner_disp, slashed_disp, remaining_disp
102 );
103 }
104}
105
106async fn listen_for_slash_events(http_url_override: Option<String>, operator_owner_override: Option<String>, debug: bool) -> anyhow::Result<()> {
107 let http_url = http_url_override
108 .or_else(|| std::env::var("SOLANA_HTTP_URL").ok())
109 .unwrap_or_else(|| "https://api.devnet.solana.com".to_string());
110
111 let program_id = restaking_programs::id();
112 let (treasury_pda, _treasury_bump) = Pubkey::find_program_address(&[b"reward_treasury"], &Pubkey::new_from_array(program_id.to_bytes()));
113
114 let vault_pda = operator_owner_override
115 .or_else(|| std::env::var("OPERATOR_OWNER").ok())
116 .and_then(|s| s.parse::<Pubkey>().ok())
117 .map(|owner| Pubkey::find_program_address(&[b"vault", owner.as_ref()], &Pubkey::new_from_array(program_id.to_bytes())).0);
118
119 let client = RpcClient::new_with_commitment(http_url.clone(), CommitmentConfig::confirmed());
120
121 println!(
122 "Polling for OperatorSlashedEvent logs touching treasury {} via {}",
123 treasury_pda, http_url
124 );
125
126 let mut last_seen_sig: Option<String> = None;
127
128 loop {
129 let mut sigs = client.get_signatures_for_address(&treasury_pda).unwrap_or_default();
130 if let Some(vault) = vault_pda {
131 let mut vs = client.get_signatures_for_address(&vault).unwrap_or_default();
132 sigs.append(&mut vs);
133 sigs.sort_by(|a,b| b.slot.cmp(&a.slot));
134 sigs.dedup_by(|a,b| a.signature == b.signature);
135 }
136
137 if !sigs.is_empty() {
138 let mut new_count = 0usize;
139 for info in sigs.iter() {
140 if let Some(ref last) = last_seen_sig { if &info.signature == last { break; } }
141 new_count += 1;
142 }
143 for info in sigs.iter().take(new_count).rev() {
144 if let Ok(tx) = client.get_transaction_with_config(&info.signature.parse().unwrap_or_default(), RpcTransactionConfig { encoding: None, commitment: Some(CommitmentConfig::confirmed()), max_supported_transaction_version: Some(0) }) {
145 if let Some(meta) = tx.transaction.meta {
146 if let OptionSerializer::Some(logs) = meta.log_messages {
147 if debug {
148 println!("-- tx {} logs --", info.signature);
149 for l in &logs { println!("{}", format_log_line(l)); }
150 }
151 parse_and_notify(&logs, debug);
152 }
153 }
154 }
155 }
156 last_seen_sig = sigs.first().map(|s| s.signature.clone());
157 }
158 tokio::time::sleep(std::time::Duration::from_secs(3)).await;
159 }
160}
161
162pub async fn run(http_url: Option<String>, operator_owner: Option<String>, debug: bool) -> anyhow::Result<()> {
163 task::spawn(listen_for_slash_events(http_url.clone(), operator_owner.clone(), debug));
164 let app = new_router();
165 let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap();
166 println!("The server is run over http://localhost:3000");
167 axum::serve(listener, app).await.unwrap();
168 Ok(())
169}