1use std::{collections::HashMap, sync::Mutex};
2
3use agave_geyser_plugin_interface::geyser_plugin_interface::{
4 GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions,
5 ReplicaEntryInfoVersions, ReplicaTransactionInfoV2, ReplicaTransactionInfoV3,
6 ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus,
7};
8use ipc_channel::ipc::IpcSender;
9use solana_clock::Slot;
10use surfpool_types::{DataIndexingCommand, SubgraphPluginConfig};
11use txtx_addon_kit::types::types::Value;
12use txtx_addon_network_svm::Pubkey;
13use txtx_addon_network_svm_types::subgraph::{
14 IndexedSubgraphSourceType, PdaSubgraphSource, SubgraphRequest,
15};
16use uuid::Uuid;
17
18#[derive(Default, Debug)]
19pub struct SurfpoolSubgraphPlugin {
20 pub uuid: Uuid,
21 subgraph_indexing_event_tx: Mutex<Option<IpcSender<DataIndexingCommand>>>,
22 subgraph_request: Option<SubgraphRequest>,
23 pda_mappings: Mutex<PdaMapping>,
24 account_update_purgatory: Mutex<AccountPurgatory>,
25}
26
27impl GeyserPlugin for SurfpoolSubgraphPlugin {
28 fn name(&self) -> &'static str {
29 "surfpool-subgraph"
30 }
31
32 fn on_load(&mut self, config_file: &str, _is_reload: bool) -> PluginResult<()> {
33 let config = serde_json::from_str::<SubgraphPluginConfig>(config_file)
34 .map_err(|e| GeyserPluginError::ConfigFileReadError { msg: e.to_string() })?;
35 let oneshot_tx = IpcSender::connect(config.ipc_token).map_err(|e| {
36 GeyserPluginError::Custom(format!("Failed to connect IPC sender: {}", e).into())
37 })?;
38 let (tx, rx) = ipc_channel::ipc::channel().map_err(|e| {
39 GeyserPluginError::Custom(format!("Failed to create IPC channel: {}", e).into())
40 })?;
41 let _ = tx.send(DataIndexingCommand::ProcessCollection(config.uuid));
42 let _ = oneshot_tx.send(rx);
43 self.uuid = config.uuid;
44 self.subgraph_indexing_event_tx = Mutex::new(Some(tx));
45 self.subgraph_request = Some(config.subgraph_request);
46 Ok(())
47 }
48
49 fn on_unload(&mut self) {}
50
51 fn notify_end_of_startup(&self) -> PluginResult<()> {
52 Ok(())
53 }
54
55 fn update_account(
56 &self,
57 account: ReplicaAccountInfoVersions,
58 slot: Slot,
59 _is_startup: bool,
60 ) -> PluginResult<()> {
61 match account {
62 ReplicaAccountInfoVersions::V0_0_1(_info) => {
63 return Err(GeyserPluginError::Custom(
64 "ReplicaAccountInfoVersions::V0_0_1 is not supported, skipping account update"
65 .into(),
66 ));
67 }
68 ReplicaAccountInfoVersions::V0_0_2(_info) => {
69 return Err(GeyserPluginError::Custom(
70 "ReplicaAccountInfoVersions::V0_0_2 is not supported, skipping account update"
71 .into(),
72 ));
73 }
74 ReplicaAccountInfoVersions::V0_0_3(info) => {
75 if info.txn.is_some() {
76 return Ok(()); }
78 }
79 }
80 let Ok(tx) = self.subgraph_indexing_event_tx.lock() else {
81 return Err(GeyserPluginError::Custom(
82 "Failed to lock subgraph indexing sender".into(),
83 ));
84 };
85 let tx = tx.as_ref().ok_or_else(|| {
86 GeyserPluginError::Custom("Failed to lock subgraph indexing sender".into())
87 })?;
88
89 let Some(ref subgraph_request) = self.subgraph_request else {
90 return Ok(());
91 };
92 let mut entries = vec![];
93
94 match account {
95 ReplicaAccountInfoVersions::V0_0_3(info) => {
96 let pubkey_bytes: [u8; 32] =
97 info.pubkey.try_into().expect("pubkey must be 32 bytes");
98 let pubkey = Pubkey::new_from_array(pubkey_bytes);
99 let owner_bytes: [u8; 32] = info
100 .owner
101 .try_into()
102 .expect("owner pubkey must be 32 bytes");
103 let owner = Pubkey::new_from_array(owner_bytes);
104
105 probe_account(
106 &self.account_update_purgatory,
107 &self.pda_mappings,
108 subgraph_request,
109 pubkey,
110 owner,
111 info.data.to_vec(),
112 slot,
113 info.lamports,
114 &mut entries,
115 )
116 .map_err(|e| GeyserPluginError::AccountsUpdateError {
117 msg: format!("{} at slot {} for account {}", e, pubkey, slot),
118 })?;
119 }
120 _ => unreachable!(),
121 };
122
123 if !entries.is_empty() {
124 let data = serde_json::to_vec(&entries).unwrap();
125 let _ = tx.send(DataIndexingCommand::ProcessCollectionEntriesPack(
126 self.uuid, data,
127 ));
128 }
129 Ok(())
130 }
131
132 fn update_slot_status(
133 &self,
134 _slot: Slot,
135 _parent: Option<u64>,
136 _status: &SlotStatus,
137 ) -> PluginResult<()> {
138 Ok(())
139 }
140
141 fn notify_transaction(
142 &self,
143 transaction: ReplicaTransactionInfoVersions,
144 slot: Slot,
145 ) -> PluginResult<()> {
146 let Ok(tx) = self.subgraph_indexing_event_tx.lock() else {
147 return Err(GeyserPluginError::Custom(
148 "Failed to lock subgraph indexing sender".into(),
149 ));
150 };
151 let tx = tx.as_ref().ok_or_else(|| {
152 GeyserPluginError::Custom("Failed to lock subgraph indexing sender".into())
153 })?;
154
155 let Some(ref subgraph_request) = self.subgraph_request else {
156 return Ok(());
157 };
158
159 let mut entries = vec![];
160 match transaction {
161 ReplicaTransactionInfoVersions::V0_0_2(data) => {
162 probe_transaction_legacy(
163 &self.account_update_purgatory,
164 &self.pda_mappings,
165 subgraph_request,
166 data,
167 slot,
168 &mut entries,
169 )
170 .map_err(|e| GeyserPluginError::TransactionUpdateError {
171 msg: format!("{} at slot {}", e, slot),
172 })?;
173 }
174 ReplicaTransactionInfoVersions::V0_0_1(_) => {
175 return Err(GeyserPluginError::Custom(
176 "ReplicaTransactionInfoVersions::V0_0_1 is not supported, skipping transaction"
177 .into(),
178 ));
179 }
180 ReplicaTransactionInfoVersions::V0_0_3(data) => {
181 probe_transaction(
182 &self.account_update_purgatory,
183 &self.pda_mappings,
184 subgraph_request,
185 data,
186 slot,
187 &mut entries,
188 )
189 .map_err(|e| GeyserPluginError::TransactionUpdateError {
190 msg: format!("{} at slot {}", e, slot),
191 })?;
192 }
193 };
194 if !entries.is_empty() {
195 let data = serde_json::to_vec(&entries).unwrap();
196 let _ = tx.send(DataIndexingCommand::ProcessCollectionEntriesPack(
197 self.uuid, data,
198 ));
199 }
200 Ok(())
201 }
202
203 fn notify_entry(&self, _entry: ReplicaEntryInfoVersions) -> PluginResult<()> {
204 Ok(())
205 }
206
207 fn notify_block_metadata(&self, _blockinfo: ReplicaBlockInfoVersions) -> PluginResult<()> {
208 Ok(())
209 }
210
211 fn account_data_notifications_enabled(&self) -> bool {
212 true
213 }
214
215 fn transaction_notifications_enabled(&self) -> bool {
216 false
217 }
218
219 fn entry_notifications_enabled(&self) -> bool {
220 false
221 }
222}
223
224#[unsafe(no_mangle)]
225#[allow(improper_ctypes_definitions)]
226pub unsafe extern "C" fn _create_plugin() -> *mut dyn GeyserPlugin {
230 let plugin: Box<dyn GeyserPlugin> = Box::<SurfpoolSubgraphPlugin>::default();
231 Box::into_raw(plugin)
232}
233
234#[allow(clippy::too_many_arguments)]
235pub fn probe_account(
236 purgatory: &Mutex<AccountPurgatory>,
237 pda_mappings: &Mutex<PdaMapping>,
238 subgraph_request: &SubgraphRequest,
239 pubkey: Pubkey,
240 owner: Pubkey,
241 data: Vec<u8>,
242 slot: Slot,
243 lamports: u64,
244 entries: &mut Vec<HashMap<String, Value>>,
245) -> Result<(), String> {
246 let SubgraphRequest::V0(subgraph_request_v0) = subgraph_request;
247
248 if owner != subgraph_request_v0.program_id {
250 return Ok(());
251 }
252
253 if let Some(pda_source) = PdaMapping::get(pda_mappings, &pubkey).unwrap() {
254 pda_source.evaluate_account_update(
255 &data,
256 subgraph_request,
257 slot,
258 pubkey,
259 owner,
260 lamports,
261 entries,
262 )
263 } else {
264 AccountPurgatory::banish(purgatory, &pubkey, slot, data, owner, lamports)
265 }
266 .map_err(|e| {
267 format!(
268 "Failed to evaluate account update for PDA {}: {}",
269 pubkey, e
270 )
271 })
272}
273
274pub fn probe_transaction(
275 purgatory: &Mutex<AccountPurgatory>,
276 pda_mappings: &Mutex<PdaMapping>,
277 subgraph_request: &SubgraphRequest,
278 data: &ReplicaTransactionInfoV3<'_>,
279 slot: Slot,
280 entries: &mut Vec<HashMap<String, Value>>,
281) -> Result<(), String> {
282 let SubgraphRequest::V0(subgraph_request_v0) = subgraph_request;
283 if data.is_vote {
284 return Ok(());
285 }
286
287 let transaction = data.transaction;
288 let account_keys = transaction.message.static_account_keys();
290 let account_pubkeys = account_keys.iter().cloned().collect::<Vec<_>>();
291 let is_program_id_match = transaction.message.instructions().iter().any(|ix| {
292 ix.program_id(account_pubkeys.as_ref())
293 .eq(&subgraph_request_v0.program_id)
294 });
295 if !is_program_id_match {
296 return Ok(());
297 }
298
299 match &subgraph_request_v0.data_source {
300 IndexedSubgraphSourceType::Instruction(_) => return Ok(()),
301 IndexedSubgraphSourceType::Event(event_source) =>
302 {
304 if let Some(ref inner_instructions) = data.transaction_status_meta.inner_instructions {
305 event_source
306 .evaluate_inner_instructions(
307 inner_instructions,
308 subgraph_request,
309 slot,
310 *transaction.signatures.first().unwrap(),
311 entries,
312 )
313 .map_err(|e| {
314 format!(
315 "Failed to evaluate inner instructions for event source: {}",
316 e
317 )
318 })?;
319 }
320 }
321 IndexedSubgraphSourceType::Pda(pda_source) => {
322 for instruction in transaction.message.instructions() {
323 let Some(pda) = pda_source.evaluate_instruction(instruction, &account_pubkeys)
324 else {
325 continue;
326 };
327
328 let Some(AccountPurgatoryData {
329 slot,
330 account_data,
331 owner,
332 lamports,
333 }) = AccountPurgatory::release(purgatory, pda_mappings, pda, pda_source.clone())?
334 else {
335 continue;
336 };
337
338 pda_source
339 .evaluate_account_update(
340 &account_data,
341 subgraph_request,
342 slot,
343 pda,
344 owner,
345 lamports,
346 entries,
347 )
348 .map_err(|e| {
349 format!("Failed to evaluate account update for PDA {}: {}", pda, e)
350 })?;
351 }
352 }
353 IndexedSubgraphSourceType::TokenAccount(token_account_source) => {
354 let mut already_found_token_accounts = vec![];
355 for instruction in transaction.message.instructions() {
356 token_account_source
357 .evaluate_instruction(
358 instruction,
359 &account_pubkeys,
360 data.transaction_status_meta,
361 slot,
362 *transaction.signatures.first().unwrap(),
363 subgraph_request,
364 &mut already_found_token_accounts,
365 entries,
366 )
367 .map_err(|e| {
368 format!(
369 "Failed to evaluate instruction for token account source: {}",
370 e
371 )
372 })?;
373 }
374 }
375 }
376 Ok(())
377}
378
379pub fn probe_transaction_legacy(
380 purgatory: &Mutex<AccountPurgatory>,
381 pda_mappings: &Mutex<PdaMapping>,
382 subgraph_request: &SubgraphRequest,
383 data: &ReplicaTransactionInfoV2<'_>,
384 slot: Slot,
385 entries: &mut Vec<HashMap<String, Value>>,
386) -> Result<(), String> {
387 let SubgraphRequest::V0(subgraph_request_v0) = subgraph_request;
388 if data.is_vote {
389 return Ok(());
390 }
391
392 let transaction = data.transaction;
393 let account_keys = transaction.message().static_account_keys();
395 let account_pubkeys = account_keys.iter().cloned().collect::<Vec<_>>();
396 let is_program_id_match = transaction.message().instructions().iter().any(|ix| {
397 ix.program_id(account_pubkeys.as_ref())
398 .eq(&subgraph_request_v0.program_id)
399 });
400 if !is_program_id_match {
401 return Ok(());
402 }
403
404 match &subgraph_request_v0.data_source {
405 IndexedSubgraphSourceType::Instruction(_) => return Ok(()),
406 IndexedSubgraphSourceType::Event(event_source) =>
407 {
409 if let Some(ref inner_instructions) = data.transaction_status_meta.inner_instructions {
410 event_source
411 .evaluate_inner_instructions(
412 inner_instructions,
413 subgraph_request,
414 slot,
415 *transaction.signature(),
416 entries,
417 )
418 .map_err(|e| {
419 format!(
420 "Failed to evaluate inner instructions for event source: {}",
421 e
422 )
423 })?;
424 }
425 }
426 IndexedSubgraphSourceType::Pda(pda_source) => {
427 for instruction in transaction.message().instructions() {
428 let Some(pda) = pda_source.evaluate_instruction(instruction, &account_pubkeys)
429 else {
430 continue;
431 };
432
433 let Some(AccountPurgatoryData {
434 slot,
435 account_data,
436 owner,
437 lamports,
438 }) = AccountPurgatory::release(purgatory, pda_mappings, pda, pda_source.clone())?
439 else {
440 continue;
441 };
442
443 pda_source
444 .evaluate_account_update(
445 &account_data,
446 subgraph_request,
447 slot,
448 pda,
449 owner,
450 lamports,
451 entries,
452 )
453 .map_err(|e| {
454 format!("Failed to evaluate account update for PDA {}: {}", pda, e)
455 })?;
456 }
457 }
458 IndexedSubgraphSourceType::TokenAccount(token_account_source) => {
459 let mut already_found_token_accounts = vec![];
460 for instruction in transaction.message().instructions() {
461 token_account_source
462 .evaluate_instruction(
463 instruction,
464 &account_pubkeys,
465 data.transaction_status_meta,
466 slot,
467 *transaction.signature(),
468 subgraph_request,
469 &mut already_found_token_accounts,
470 entries,
471 )
472 .map_err(|e| {
473 format!(
474 "Failed to evaluate instruction for token account source: {}",
475 e
476 )
477 })?;
478 }
479 }
480 }
481 Ok(())
482}
483
484#[derive(Default, Debug)]
485pub struct PdaMapping(pub HashMap<Pubkey, PdaSubgraphSource>);
486impl PdaMapping {
487 pub fn new() -> Self {
488 Self(HashMap::new())
489 }
490
491 pub fn insert(&mut self, pubkey: Pubkey, pda_source: PdaSubgraphSource) {
492 self.0.insert(pubkey, pda_source);
493 }
494
495 pub fn _get(&self, pubkey: &Pubkey) -> Option<&PdaSubgraphSource> {
496 self.0.get(pubkey)
497 }
498
499 pub fn get(
500 pda_mapping: &Mutex<Self>,
501 pubkey: &Pubkey,
502 ) -> Result<Option<PdaSubgraphSource>, String> {
503 pda_mapping
504 .lock()
505 .map_err(|e| format!("Failed to lock PdaMapping: {}", e))
506 .map(|mapping| mapping._get(pubkey).cloned())
507 }
508}
509
510#[derive(Default, Debug)]
511pub struct AccountPurgatory(pub HashMap<Pubkey, AccountPurgatoryData>);
512
513impl AccountPurgatory {
514 pub fn new() -> Self {
515 Self(HashMap::new())
516 }
517
518 fn insert(&mut self, pubkey: Pubkey, data: AccountPurgatoryData) {
519 self.0.insert(pubkey, data);
520 }
521
522 fn remove(&mut self, pubkey: &Pubkey) -> Option<AccountPurgatoryData> {
523 self.0.remove(pubkey)
524 }
525
526 pub fn banish(
527 purgatory: &Mutex<Self>,
528 pubkey: &Pubkey,
529 slot: Slot,
530 account_data: Vec<u8>,
531 owner: Pubkey,
532 lamports: u64,
533 ) -> Result<(), String> {
534 purgatory
535 .lock()
536 .map_err(|e| format!("Failed to lock AccountPurgatory: {}", e))
537 .map(|mut purgatory| {
538 purgatory.insert(
539 *pubkey,
540 AccountPurgatoryData {
541 slot,
542 account_data,
543 owner,
544 lamports,
545 },
546 )
547 })
548 }
549
550 pub fn release(
551 purgatory: &Mutex<Self>,
552 pda_mapping: &Mutex<PdaMapping>,
553 pubkey: Pubkey,
554 pda_source: PdaSubgraphSource,
555 ) -> Result<Option<AccountPurgatoryData>, String> {
556 pda_mapping
557 .lock()
558 .map_err(|e| format!("Failed to lock PdaMapping: {}", e))?
559 .insert(pubkey, pda_source);
560
561 purgatory
562 .lock()
563 .map_err(|e| format!("Failed to lock AccountPurgatory: {}", e))
564 .map(|mut purgatory| purgatory.remove(&pubkey))
565 }
566}
567
568#[derive(Default, Debug)]
569pub struct AccountPurgatoryData {
570 slot: Slot,
571 account_data: Vec<u8>,
572 owner: Pubkey,
573 lamports: u64,
574}