operator_nodes_client/
lib.rs

1mod handlers;
2use handlers::router::new_router;
3use tokio::net::TcpListener;
4use tokio::task;
5use solana_client::rpc_client::RpcClient;
6use solana_client::rpc_config::RpcTransactionConfig;
7use solana_sdk::commitment_config::CommitmentConfig;
8use solana_sdk::pubkey::Pubkey;
9use solana_transaction_status_client_types::option_serializer::OptionSerializer;
10use base64::{engine::general_purpose, Engine as _};
11
12fn format_log_line(line: &str) -> String {
13    if let Some(rest) = line.strip_prefix("Program data: ") {
14        return format!("[program data omitted: {} bytes base64]", rest.len());
15    }
16    if let Some(rest) = line.strip_prefix("Program log: ") {
17        return rest.to_string();
18    }
19    if line.contains("BPF program ") || line.contains("Program invoke") || line.contains("Program returned") {
20        return String::new();
21    }
22    line.to_string()
23}
24
25fn extract_number(s: &str) -> Option<u64> {
26    let mut digits = String::new();
27    for ch in s.chars() {
28        if ch.is_ascii_digit() {
29            digits.push(ch);
30        } else if !digits.is_empty() {
31            break;
32        }
33    }
34    digits.parse::<u64>().ok()
35}
36
37fn parse_and_notify(log_lines: &[String], debug: bool) {
38    let mut saw_event = false;
39    let mut operator_owner: Option<String> = None;
40    let mut slashed_amount: Option<u64> = None;
41    let mut remaining_bond: Option<u64> = None;
42
43    fn parse_event_from_program_data(b64: &str) -> Option<(String, u64, u64)> {
44        let bytes = general_purpose::STANDARD.decode(b64).ok()?;
45        if bytes.len() < 56 { return None; }
46        let data = &bytes[8..];
47        if data.len() < 48 { return None; }
48        let mut owner_arr = [0u8; 32];
49        owner_arr.copy_from_slice(&data[0..32]);
50        let owner = Pubkey::new_from_array(owner_arr).to_string();
51        let slashed = u64::from_le_bytes(data[32..40].try_into().ok()?);
52        let remaining = u64::from_le_bytes(data[40..48].try_into().ok()?);
53        Some((owner, slashed, remaining))
54    }
55
56    for line in log_lines.iter() {
57        if line.contains("OperatorSlashedEvent") { saw_event = true; }
58        if line.contains("Instruction: SlashOperator") || line.contains("Operator wrong! Slashing via CPI") { saw_event = true; }
59        if line.contains("Operator slashed") {
60            if let Some(idx) = line.find("Operator slashed") {
61                if let Some(num) = extract_number(&line[idx..]) { slashed_amount = Some(num); }
62            }
63        }
64        if line.contains("slashed_amount") || line.contains("remaining_bond") || line.contains("operator_owner") {
65            if let Some(idx) = line.find("slashed_amount:") {
66                if let Some(num) = extract_number(&line[idx + "slashed_amount:".len()..]) { slashed_amount = Some(num); }
67            }
68            if let Some(idx) = line.find("remaining_bond:") {
69                if let Some(num) = extract_number(&line[idx + "remaining_bond:".len()..]) { remaining_bond = Some(num); }
70            }
71            if let Some(idx) = line.find("operator_owner:") {
72                let tail = line[idx + "operator_owner:".len()..].trim();
73                operator_owner = tail
74                    .split_whitespace()
75                    .next()
76                    .map(|s| s.trim_matches(&[',', '}'][..]).to_string())
77                    .map(|mut s| {
78                        if let Some(stripped) = s.strip_prefix("Some(").and_then(|x| x.strip_suffix(')')) { s = stripped.to_string(); }
79                        s
80                    });
81            }
82        }
83        if let Some(b64) = line.strip_prefix("Program data: ") {
84            if let Some((owner, slash, remain)) = parse_event_from_program_data(b64.trim()) {
85                saw_event = true;
86                if operator_owner.is_none() { operator_owner = Some(owner); }
87                if slashed_amount.is_none() { slashed_amount = Some(slash); }
88                if remaining_bond.is_none() { remaining_bond = Some(remain); }
89            }
90        }
91    }
92
93    if saw_event {
94        let owner_disp = operator_owner
95            .or_else(|| std::env::var("OPERATOR_OWNER").ok())
96            .unwrap_or_else(|| "unknown".to_string());
97        let slashed_disp = slashed_amount.map(|v| v.to_string()).unwrap_or_else(|| "unknown".to_string());
98        let remaining_disp = remaining_bond.map(|v| v.to_string()).unwrap_or_else(|| "unknown".to_string());
99        println!(
100            "[NOTIFY] OperatorSlashedEvent: operator_owner={}, slashed_amount={}, remaining_bond={}",
101            owner_disp, slashed_disp, remaining_disp
102        );
103    }
104}
105
106async fn listen_for_slash_events(http_url_override: Option<String>, operator_owner_override: Option<String>, debug: bool) -> anyhow::Result<()> {
107    let http_url = http_url_override
108        .or_else(|| std::env::var("SOLANA_HTTP_URL").ok())
109        .unwrap_or_else(|| "https://api.devnet.solana.com".to_string());
110
111    let program_id = restaking_programs::id();
112    let (treasury_pda, _treasury_bump) = Pubkey::find_program_address(&[b"reward_treasury"], &Pubkey::new_from_array(program_id.to_bytes()));
113
114    let vault_pda = operator_owner_override
115        .or_else(|| std::env::var("OPERATOR_OWNER").ok())
116        .and_then(|s| s.parse::<Pubkey>().ok())
117        .map(|owner| Pubkey::find_program_address(&[b"vault", owner.as_ref()], &Pubkey::new_from_array(program_id.to_bytes())).0);
118
119    let client = RpcClient::new_with_commitment(http_url.clone(), CommitmentConfig::confirmed());
120
121    println!(
122        "Polling for OperatorSlashedEvent logs touching treasury {} via {}",
123        treasury_pda, http_url
124    );
125
126    let mut last_seen_sig: Option<String> = None;
127
128    loop {
129        let mut sigs = client.get_signatures_for_address(&treasury_pda).unwrap_or_default();
130        if let Some(vault) = vault_pda {
131            let mut vs = client.get_signatures_for_address(&vault).unwrap_or_default();
132            sigs.append(&mut vs);
133            sigs.sort_by(|a,b| b.slot.cmp(&a.slot));
134            sigs.dedup_by(|a,b| a.signature == b.signature);
135        }
136
137        if !sigs.is_empty() {
138            let mut new_count = 0usize;
139            for info in sigs.iter() {
140                if let Some(ref last) = last_seen_sig { if &info.signature == last { break; } }
141                new_count += 1;
142            }
143            for info in sigs.iter().take(new_count).rev() {
144                if let Ok(tx) = client.get_transaction_with_config(&info.signature.parse().unwrap_or_default(), RpcTransactionConfig { encoding: None, commitment: Some(CommitmentConfig::confirmed()), max_supported_transaction_version: Some(0) }) {
145                    if let Some(meta) = tx.transaction.meta {
146                        if let OptionSerializer::Some(logs) = meta.log_messages {
147                            if debug {
148                                println!("-- tx {} logs --", info.signature);
149                                for l in &logs { println!("{}", format_log_line(l)); }
150                            }
151                            parse_and_notify(&logs, debug);
152                        }
153                    }
154                }
155            }
156            last_seen_sig = sigs.first().map(|s| s.signature.clone());
157        }
158        tokio::time::sleep(std::time::Duration::from_secs(3)).await;
159    }
160}
161
162pub async fn run(http_url: Option<String>, operator_owner: Option<String>, debug: bool) -> anyhow::Result<()> {
163    task::spawn(listen_for_slash_events(http_url.clone(), operator_owner.clone(), debug));
164    let app = new_router();
165    let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap();
166    println!("The server is run over http://localhost:3000");
167    axum::serve(listener, app).await.unwrap();
168    Ok(())
169}