Skip to main content

surfpool_subgraph/
lib.rs

1use std::{collections::HashMap, sync::Mutex};
2
3use agave_geyser_plugin_interface::geyser_plugin_interface::{
4    GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions,
5    ReplicaEntryInfoVersions, ReplicaTransactionInfoV2, ReplicaTransactionInfoV3,
6    ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus,
7};
8use ipc_channel::ipc::IpcSender;
9use solana_clock::Slot;
10use surfpool_types::{DataIndexingCommand, SubgraphPluginConfig};
11use txtx_addon_kit::types::types::Value;
12use txtx_addon_network_svm::Pubkey;
13use txtx_addon_network_svm_types::subgraph::{
14    IndexedSubgraphSourceType, PdaSubgraphSource, SubgraphRequest,
15};
16use uuid::Uuid;
17
18#[derive(Default, Debug)]
19pub struct SurfpoolSubgraphPlugin {
20    pub uuid: Uuid,
21    subgraph_indexing_event_tx: Mutex<Option<IpcSender<DataIndexingCommand>>>,
22    subgraph_request: Option<SubgraphRequest>,
23    pda_mappings: Mutex<PdaMapping>,
24    account_update_purgatory: Mutex<AccountPurgatory>,
25}
26
27impl GeyserPlugin for SurfpoolSubgraphPlugin {
28    fn name(&self) -> &'static str {
29        "surfpool-subgraph"
30    }
31
32    fn on_load(&mut self, config_file: &str, _is_reload: bool) -> PluginResult<()> {
33        let config = serde_json::from_str::<SubgraphPluginConfig>(config_file)
34            .map_err(|e| GeyserPluginError::ConfigFileReadError { msg: e.to_string() })?;
35        let oneshot_tx = IpcSender::connect(config.ipc_token).map_err(|e| {
36            GeyserPluginError::Custom(format!("Failed to connect IPC sender: {}", e).into())
37        })?;
38        let (tx, rx) = ipc_channel::ipc::channel().map_err(|e| {
39            GeyserPluginError::Custom(format!("Failed to create IPC channel: {}", e).into())
40        })?;
41        let _ = tx.send(DataIndexingCommand::ProcessCollection(config.uuid));
42        let _ = oneshot_tx.send(rx);
43        self.uuid = config.uuid;
44        self.subgraph_indexing_event_tx = Mutex::new(Some(tx));
45        self.subgraph_request = Some(config.subgraph_request);
46        Ok(())
47    }
48
49    fn on_unload(&mut self) {}
50
51    fn notify_end_of_startup(&self) -> PluginResult<()> {
52        Ok(())
53    }
54
55    fn update_account(
56        &self,
57        account: ReplicaAccountInfoVersions,
58        slot: Slot,
59        _is_startup: bool,
60    ) -> PluginResult<()> {
61        match account {
62            ReplicaAccountInfoVersions::V0_0_1(_info) => {
63                return Err(GeyserPluginError::Custom(
64                    "ReplicaAccountInfoVersions::V0_0_1 is not supported, skipping account update"
65                        .into(),
66                ));
67            }
68            ReplicaAccountInfoVersions::V0_0_2(_info) => {
69                return Err(GeyserPluginError::Custom(
70                    "ReplicaAccountInfoVersions::V0_0_2 is not supported, skipping account update"
71                        .into(),
72                ));
73            }
74            ReplicaAccountInfoVersions::V0_0_3(info) => {
75                if info.txn.is_some() {
76                    return Ok(()); // We only care about account updates _without_ a transaction, indicating it's a post-block update rather than post-transaction
77                }
78            }
79        }
80        let Ok(tx) = self.subgraph_indexing_event_tx.lock() else {
81            return Err(GeyserPluginError::Custom(
82                "Failed to lock subgraph indexing sender".into(),
83            ));
84        };
85        let tx = tx.as_ref().ok_or_else(|| {
86            GeyserPluginError::Custom("Failed to lock subgraph indexing sender".into())
87        })?;
88
89        let Some(ref subgraph_request) = self.subgraph_request else {
90            return Ok(());
91        };
92        let mut entries = vec![];
93
94        match account {
95            ReplicaAccountInfoVersions::V0_0_3(info) => {
96                let pubkey_bytes: [u8; 32] =
97                    info.pubkey.try_into().expect("pubkey must be 32 bytes");
98                let pubkey = Pubkey::new_from_array(pubkey_bytes);
99                let owner_bytes: [u8; 32] = info
100                    .owner
101                    .try_into()
102                    .expect("owner pubkey must be 32 bytes");
103                let owner = Pubkey::new_from_array(owner_bytes);
104
105                probe_account(
106                    &self.account_update_purgatory,
107                    &self.pda_mappings,
108                    subgraph_request,
109                    pubkey,
110                    owner,
111                    info.data.to_vec(),
112                    slot,
113                    info.lamports,
114                    &mut entries,
115                )
116                .map_err(|e| GeyserPluginError::AccountsUpdateError {
117                    msg: format!("{} at slot {} for account {}", e, pubkey, slot),
118                })?;
119            }
120            _ => unreachable!(),
121        };
122
123        if !entries.is_empty() {
124            let data = serde_json::to_vec(&entries).unwrap();
125            let _ = tx.send(DataIndexingCommand::ProcessCollectionEntriesPack(
126                self.uuid, data,
127            ));
128        }
129        Ok(())
130    }
131
132    fn update_slot_status(
133        &self,
134        _slot: Slot,
135        _parent: Option<u64>,
136        _status: &SlotStatus,
137    ) -> PluginResult<()> {
138        Ok(())
139    }
140
141    fn notify_transaction(
142        &self,
143        transaction: ReplicaTransactionInfoVersions,
144        slot: Slot,
145    ) -> PluginResult<()> {
146        let Ok(tx) = self.subgraph_indexing_event_tx.lock() else {
147            return Err(GeyserPluginError::Custom(
148                "Failed to lock subgraph indexing sender".into(),
149            ));
150        };
151        let tx = tx.as_ref().ok_or_else(|| {
152            GeyserPluginError::Custom("Failed to lock subgraph indexing sender".into())
153        })?;
154
155        let Some(ref subgraph_request) = self.subgraph_request else {
156            return Ok(());
157        };
158
159        let mut entries = vec![];
160        match transaction {
161            ReplicaTransactionInfoVersions::V0_0_2(data) => {
162                probe_transaction_legacy(
163                    &self.account_update_purgatory,
164                    &self.pda_mappings,
165                    subgraph_request,
166                    data,
167                    slot,
168                    &mut entries,
169                )
170                .map_err(|e| GeyserPluginError::TransactionUpdateError {
171                    msg: format!("{} at slot {}", e, slot),
172                })?;
173            }
174            ReplicaTransactionInfoVersions::V0_0_1(_) => {
175                return Err(GeyserPluginError::Custom(
176                    "ReplicaTransactionInfoVersions::V0_0_1 is not supported, skipping transaction"
177                        .into(),
178                ));
179            }
180            ReplicaTransactionInfoVersions::V0_0_3(data) => {
181                probe_transaction(
182                    &self.account_update_purgatory,
183                    &self.pda_mappings,
184                    subgraph_request,
185                    data,
186                    slot,
187                    &mut entries,
188                )
189                .map_err(|e| GeyserPluginError::TransactionUpdateError {
190                    msg: format!("{} at slot {}", e, slot),
191                })?;
192            }
193        };
194        if !entries.is_empty() {
195            let data = serde_json::to_vec(&entries).unwrap();
196            let _ = tx.send(DataIndexingCommand::ProcessCollectionEntriesPack(
197                self.uuid, data,
198            ));
199        }
200        Ok(())
201    }
202
203    fn notify_entry(&self, _entry: ReplicaEntryInfoVersions) -> PluginResult<()> {
204        Ok(())
205    }
206
207    fn notify_block_metadata(&self, _blockinfo: ReplicaBlockInfoVersions) -> PluginResult<()> {
208        Ok(())
209    }
210
211    fn account_data_notifications_enabled(&self) -> bool {
212        true
213    }
214
215    fn transaction_notifications_enabled(&self) -> bool {
216        false
217    }
218
219    fn entry_notifications_enabled(&self) -> bool {
220        false
221    }
222}
223
224#[unsafe(no_mangle)]
225#[allow(improper_ctypes_definitions)]
226/// # Safety
227///
228/// This function returns the Plugin pointer as trait GeyserPlugin.
229pub unsafe extern "C" fn _create_plugin() -> *mut dyn GeyserPlugin {
230    let plugin: Box<dyn GeyserPlugin> = Box::<SurfpoolSubgraphPlugin>::default();
231    Box::into_raw(plugin)
232}
233
234#[allow(clippy::too_many_arguments)]
235pub fn probe_account(
236    purgatory: &Mutex<AccountPurgatory>,
237    pda_mappings: &Mutex<PdaMapping>,
238    subgraph_request: &SubgraphRequest,
239    pubkey: Pubkey,
240    owner: Pubkey,
241    data: Vec<u8>,
242    slot: Slot,
243    lamports: u64,
244    entries: &mut Vec<HashMap<String, Value>>,
245) -> Result<(), String> {
246    let SubgraphRequest::V0(subgraph_request_v0) = subgraph_request;
247
248    // Only process accounts owned by the configured program
249    if owner != subgraph_request_v0.program_id {
250        return Ok(());
251    }
252
253    if let Some(pda_source) = PdaMapping::get(pda_mappings, &pubkey).unwrap() {
254        pda_source.evaluate_account_update(
255            &data,
256            subgraph_request,
257            slot,
258            pubkey,
259            owner,
260            lamports,
261            entries,
262        )
263    } else {
264        AccountPurgatory::banish(purgatory, &pubkey, slot, data, owner, lamports)
265    }
266    .map_err(|e| {
267        format!(
268            "Failed to evaluate account update for PDA {}: {}",
269            pubkey, e
270        )
271    })
272}
273
274pub fn probe_transaction(
275    purgatory: &Mutex<AccountPurgatory>,
276    pda_mappings: &Mutex<PdaMapping>,
277    subgraph_request: &SubgraphRequest,
278    data: &ReplicaTransactionInfoV3<'_>,
279    slot: Slot,
280    entries: &mut Vec<HashMap<String, Value>>,
281) -> Result<(), String> {
282    let SubgraphRequest::V0(subgraph_request_v0) = subgraph_request;
283    if data.is_vote {
284        return Ok(());
285    }
286
287    let transaction = data.transaction;
288    // FIXME: for versioned messages we have to handle also dynamic keys
289    let account_keys = transaction.message.static_account_keys();
290    let account_pubkeys = account_keys.iter().cloned().collect::<Vec<_>>();
291    let is_program_id_match = transaction.message.instructions().iter().any(|ix| {
292        ix.program_id(account_pubkeys.as_ref())
293            .eq(&subgraph_request_v0.program_id)
294    });
295    if !is_program_id_match {
296        return Ok(());
297    }
298
299    match &subgraph_request_v0.data_source {
300        IndexedSubgraphSourceType::Instruction(_) => return Ok(()),
301        IndexedSubgraphSourceType::Event(event_source) =>
302        // Check inner instructions
303        {
304            if let Some(ref inner_instructions) = data.transaction_status_meta.inner_instructions {
305                event_source
306                    .evaluate_inner_instructions(
307                        inner_instructions,
308                        subgraph_request,
309                        slot,
310                        *transaction.signatures.first().unwrap(),
311                        entries,
312                    )
313                    .map_err(|e| {
314                        format!(
315                            "Failed to evaluate inner instructions for event source: {}",
316                            e
317                        )
318                    })?;
319            }
320        }
321        IndexedSubgraphSourceType::Pda(pda_source) => {
322            for instruction in transaction.message.instructions() {
323                let Some(pda) = pda_source.evaluate_instruction(instruction, &account_pubkeys)
324                else {
325                    continue;
326                };
327
328                let Some(AccountPurgatoryData {
329                    slot,
330                    account_data,
331                    owner,
332                    lamports,
333                }) = AccountPurgatory::release(purgatory, pda_mappings, pda, pda_source.clone())?
334                else {
335                    continue;
336                };
337
338                pda_source
339                    .evaluate_account_update(
340                        &account_data,
341                        subgraph_request,
342                        slot,
343                        pda,
344                        owner,
345                        lamports,
346                        entries,
347                    )
348                    .map_err(|e| {
349                        format!("Failed to evaluate account update for PDA {}: {}", pda, e)
350                    })?;
351            }
352        }
353        IndexedSubgraphSourceType::TokenAccount(token_account_source) => {
354            let mut already_found_token_accounts = vec![];
355            for instruction in transaction.message.instructions() {
356                token_account_source
357                    .evaluate_instruction(
358                        instruction,
359                        &account_pubkeys,
360                        data.transaction_status_meta,
361                        slot,
362                        *transaction.signatures.first().unwrap(),
363                        subgraph_request,
364                        &mut already_found_token_accounts,
365                        entries,
366                    )
367                    .map_err(|e| {
368                        format!(
369                            "Failed to evaluate instruction for token account source: {}",
370                            e
371                        )
372                    })?;
373            }
374        }
375    }
376    Ok(())
377}
378
379pub fn probe_transaction_legacy(
380    purgatory: &Mutex<AccountPurgatory>,
381    pda_mappings: &Mutex<PdaMapping>,
382    subgraph_request: &SubgraphRequest,
383    data: &ReplicaTransactionInfoV2<'_>,
384    slot: Slot,
385    entries: &mut Vec<HashMap<String, Value>>,
386) -> Result<(), String> {
387    let SubgraphRequest::V0(subgraph_request_v0) = subgraph_request;
388    if data.is_vote {
389        return Ok(());
390    }
391
392    let transaction = data.transaction;
393    // FIXME: for versioned messages we have to handle also dynamic keys
394    let account_keys = transaction.message().static_account_keys();
395    let account_pubkeys = account_keys.iter().cloned().collect::<Vec<_>>();
396    let is_program_id_match = transaction.message().instructions().iter().any(|ix| {
397        ix.program_id(account_pubkeys.as_ref())
398            .eq(&subgraph_request_v0.program_id)
399    });
400    if !is_program_id_match {
401        return Ok(());
402    }
403
404    match &subgraph_request_v0.data_source {
405        IndexedSubgraphSourceType::Instruction(_) => return Ok(()),
406        IndexedSubgraphSourceType::Event(event_source) =>
407        // Check inner instructions
408        {
409            if let Some(ref inner_instructions) = data.transaction_status_meta.inner_instructions {
410                event_source
411                    .evaluate_inner_instructions(
412                        inner_instructions,
413                        subgraph_request,
414                        slot,
415                        *transaction.signature(),
416                        entries,
417                    )
418                    .map_err(|e| {
419                        format!(
420                            "Failed to evaluate inner instructions for event source: {}",
421                            e
422                        )
423                    })?;
424            }
425        }
426        IndexedSubgraphSourceType::Pda(pda_source) => {
427            for instruction in transaction.message().instructions() {
428                let Some(pda) = pda_source.evaluate_instruction(instruction, &account_pubkeys)
429                else {
430                    continue;
431                };
432
433                let Some(AccountPurgatoryData {
434                    slot,
435                    account_data,
436                    owner,
437                    lamports,
438                }) = AccountPurgatory::release(purgatory, pda_mappings, pda, pda_source.clone())?
439                else {
440                    continue;
441                };
442
443                pda_source
444                    .evaluate_account_update(
445                        &account_data,
446                        subgraph_request,
447                        slot,
448                        pda,
449                        owner,
450                        lamports,
451                        entries,
452                    )
453                    .map_err(|e| {
454                        format!("Failed to evaluate account update for PDA {}: {}", pda, e)
455                    })?;
456            }
457        }
458        IndexedSubgraphSourceType::TokenAccount(token_account_source) => {
459            let mut already_found_token_accounts = vec![];
460            for instruction in transaction.message().instructions() {
461                token_account_source
462                    .evaluate_instruction(
463                        instruction,
464                        &account_pubkeys,
465                        data.transaction_status_meta,
466                        slot,
467                        *transaction.signature(),
468                        subgraph_request,
469                        &mut already_found_token_accounts,
470                        entries,
471                    )
472                    .map_err(|e| {
473                        format!(
474                            "Failed to evaluate instruction for token account source: {}",
475                            e
476                        )
477                    })?;
478            }
479        }
480    }
481    Ok(())
482}
483
484#[derive(Default, Debug)]
485pub struct PdaMapping(pub HashMap<Pubkey, PdaSubgraphSource>);
486impl PdaMapping {
487    pub fn new() -> Self {
488        Self(HashMap::new())
489    }
490
491    pub fn insert(&mut self, pubkey: Pubkey, pda_source: PdaSubgraphSource) {
492        self.0.insert(pubkey, pda_source);
493    }
494
495    pub fn _get(&self, pubkey: &Pubkey) -> Option<&PdaSubgraphSource> {
496        self.0.get(pubkey)
497    }
498
499    pub fn get(
500        pda_mapping: &Mutex<Self>,
501        pubkey: &Pubkey,
502    ) -> Result<Option<PdaSubgraphSource>, String> {
503        pda_mapping
504            .lock()
505            .map_err(|e| format!("Failed to lock PdaMapping: {}", e))
506            .map(|mapping| mapping._get(pubkey).cloned())
507    }
508}
509
510#[derive(Default, Debug)]
511pub struct AccountPurgatory(pub HashMap<Pubkey, AccountPurgatoryData>);
512
513impl AccountPurgatory {
514    pub fn new() -> Self {
515        Self(HashMap::new())
516    }
517
518    fn insert(&mut self, pubkey: Pubkey, data: AccountPurgatoryData) {
519        self.0.insert(pubkey, data);
520    }
521
522    fn remove(&mut self, pubkey: &Pubkey) -> Option<AccountPurgatoryData> {
523        self.0.remove(pubkey)
524    }
525
526    pub fn banish(
527        purgatory: &Mutex<Self>,
528        pubkey: &Pubkey,
529        slot: Slot,
530        account_data: Vec<u8>,
531        owner: Pubkey,
532        lamports: u64,
533    ) -> Result<(), String> {
534        purgatory
535            .lock()
536            .map_err(|e| format!("Failed to lock AccountPurgatory: {}", e))
537            .map(|mut purgatory| {
538                purgatory.insert(
539                    *pubkey,
540                    AccountPurgatoryData {
541                        slot,
542                        account_data,
543                        owner,
544                        lamports,
545                    },
546                )
547            })
548    }
549
550    pub fn release(
551        purgatory: &Mutex<Self>,
552        pda_mapping: &Mutex<PdaMapping>,
553        pubkey: Pubkey,
554        pda_source: PdaSubgraphSource,
555    ) -> Result<Option<AccountPurgatoryData>, String> {
556        pda_mapping
557            .lock()
558            .map_err(|e| format!("Failed to lock PdaMapping: {}", e))?
559            .insert(pubkey, pda_source);
560
561        purgatory
562            .lock()
563            .map_err(|e| format!("Failed to lock AccountPurgatory: {}", e))
564            .map(|mut purgatory| purgatory.remove(&pubkey))
565    }
566}
567
568#[derive(Default, Debug)]
569pub struct AccountPurgatoryData {
570    slot: Slot,
571    account_data: Vec<u8>,
572    owner: Pubkey,
573    lamports: u64,
574}