use anyhow::{anyhow, Result};
use solana_transaction_status_client_types::TransactionDetails;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::pubkey::Pubkey;
use tokio::time::{sleep, Duration};
use tape_client::{get_slot, get_blocks_with_limit, get_block_by_number, get_archive_account};
use reqwest::Client as HttpClient;
use serde_json::json;
use base64::decode;
use super::block::{process_block, ProcessedBlock};
use super::store::TapeStore;
pub async fn archive_loop(
store: &TapeStore,
client: &RpcClient,
starting_slot: Option<u64>,
trusted_peer: Option<String>,
) -> Result<()> {
if let Some(peer_url) = trusted_peer.clone() {
println!("DEBUG: Syncing with trusted peer");
sync_with_trusted_peer(store, client, &peer_url).await?;
}
let interval = Duration::from_secs(2);
let mut latest_slot = match starting_slot {
Some(slot) => slot,
None => {
get_slot(client).await?
}
};
println!("DEBUG: Initial slot tip: {}", latest_slot);
let mut last_processed_slot = starting_slot
.or_else(|| store.get_health().map(|(slot, _)| slot).ok())
.unwrap_or(latest_slot);
let mut iteration_count = 0;
loop {
match try_archive_iteration(
store,
client,
&mut latest_slot,
&mut last_processed_slot,
&mut iteration_count,
).await {
Ok(()) => println!("DEBUG: Block processing iteration completed successfully"),
Err(e) => eprintln!("ERROR: Block processing iteration failed: {:?}", e),
}
drift_status(store, latest_slot, last_processed_slot);
sleep(interval).await;
}
}
async fn try_archive_iteration(
store: &TapeStore,
client: &RpcClient,
latest_slot: &mut u64,
last_processed_slot: &mut u64,
iteration_count: &mut u64,
) -> Result<()> {
*iteration_count += 1;
if *iteration_count % 10 == 0 {
if let Ok(slot) = get_slot(client).await {
*latest_slot = slot;
println!("DEBUG: Updated slot tip: {}", slot);
} else {
println!("DEBUG: Failed to get slot tip");
}
}
let start = *last_processed_slot + 1;
let slots = get_blocks_with_limit(client, start, 100).await?;
println!("DEBUG: Fetched {} new slots from {}", slots.len(), start);
for slot in slots {
let block = get_block_by_number(client, slot, TransactionDetails::Full).await?;
let processed = process_block(block, slot)?;
if !processed.tapes.is_empty() || !processed.writes.is_empty() {
archive_block(store, &processed)?;
}
*last_processed_slot = slot;
}
Ok(())
}
fn archive_block(store: &TapeStore, block: &ProcessedBlock) -> Result<()> {
for (address, number) in &block.tapes {
store.add_tape(*number, address)?;
}
for ((tape, segment), data) in &block.writes {
store.add_segment(tape, *segment, data.clone())?;
}
Ok(())
}
async fn sync_with_trusted_peer(
store: &TapeStore,
client: &RpcClient,
trusted_peer_url: &str,
) -> Result<()> {
let (archive, _) = get_archive_account(client).await?;
let total = archive.tapes_stored;
let http = HttpClient::new();
for tape_number in 1..(total+1) {
if store.get_tape_address(tape_number).is_ok() {
continue;
}
let addr_resp = http.post(trusted_peer_url)
.header("Content-Type", "application/json")
.body(json!({
"jsonrpc": "2.0", "id": 1,
"method": "getTapeAddress",
"params": { "tape_number": tape_number }
}).to_string())
.send().await?
.json::<serde_json::Value>().await?;
let addr_str = addr_resp["result"]
.as_str()
.ok_or_else(|| anyhow!("Invalid getTapeAddress response: {:?}", addr_resp))?;
let tape_address: Pubkey = addr_str.parse()?;
store.add_tape(tape_number, &tape_address)?;
let seg_resp = http.post(trusted_peer_url)
.header("Content-Type", "application/json")
.body(json!({
"jsonrpc": "2.0", "id": 4,
"method": "getTape",
"params": { "tape_address": addr_str }
}).to_string())
.send().await?
.json::<serde_json::Value>().await?;
let segments = seg_resp["result"].as_array()
.ok_or_else(|| anyhow!("Invalid getTape response: {:?}", seg_resp))?;
for seg in segments {
let seg_num = seg["segment_number"]
.as_u64()
.ok_or_else(|| anyhow!("Invalid segment_number: {:?}", seg))?;
let data_b64 = seg["data"]
.as_str()
.ok_or_else(|| anyhow!("Invalid data field: {:?}", seg))?;
let data = decode(data_b64)?;
store.add_segment(&tape_address, seg_num, data)?;
}
}
Ok(())
}
fn drift_status(
store: &TapeStore,
latest_slot: u64,
last_processed_slot: u64,
) {
let drift = latest_slot.saturating_sub(last_processed_slot);
if let Err(e) = store.update_health(last_processed_slot, drift) {
println!("ERROR: failed to write health metadata: {:?}", e);
}
let health_status = if drift < 50 {
"Healthy"
} else if drift < 200 {
"Slightly behind"
} else {
"Falling behind"
};
println!(
"DEBUG: Drift {} slots behind tip ({}), status: {}",
drift, latest_slot, health_status
);
}