use crate::{phoenix_log_authority, state::markets::MarketEvent};
use borsh::BorshSerialize;
use solana_program::{
account_info::AccountInfo,
clock::Clock,
entrypoint::ProgramResult,
instruction::{AccountMeta, Instruction},
program::invoke_signed,
program_error::ProgramError,
pubkey::Pubkey,
sysvar::Sysvar,
};
use super::{
assert_with_msg, checkers::phoenix_checkers::MarketAccountInfo, AuditLogHeader, PhoenixError,
PhoenixInstruction, PhoenixLogContext, PhoenixMarketContext, PhoenixMarketEvent,
};
const MAX_INNER_INSTRUCTION_SIZE: usize = 1280;
const LOG_IX_ACCOUNT_META_SIZE: usize = 34;
const HEADER_LEN: usize = 93;
const MAX_EVENT_SIZE: usize = 67;
pub(crate) struct EventRecorder<'info> {
phoenix_program: AccountInfo<'info>,
log_authority: AccountInfo<'info>,
phoenix_instruction: PhoenixInstruction,
scratch_buffer: Vec<u8>,
pub log_instruction: Instruction,
state_tracker: EventStateTracker,
error_code: Option<PhoenixError>,
}
impl<'info> EventRecorder<'info> {
pub(crate) fn new<'a>(
phoenix_log_context: PhoenixLogContext<'a, 'info>,
phoenix_market_context: &PhoenixMarketContext<'a, 'info>,
phoenix_instruction: PhoenixInstruction,
) -> Result<Self, ProgramError> {
let PhoenixLogContext {
phoenix_program,
log_authority,
} = phoenix_log_context;
let PhoenixMarketContext {
market_info,
signer,
} = phoenix_market_context;
let header = market_info.get_header()?;
let clock = Clock::get()?;
let mut data = Vec::with_capacity(MAX_INNER_INSTRUCTION_SIZE);
data.push(PhoenixInstruction::Log as u8);
PhoenixMarketEvent::Header(AuditLogHeader {
instruction: phoenix_instruction as u8,
sequence_number: header.market_sequence_number,
timestamp: clock.unix_timestamp,
slot: clock.slot,
market: *market_info.key,
signer: *signer.key,
total_events: 0, })
.serialize(&mut data)?;
Ok(Self {
phoenix_program: phoenix_program.as_ref().clone(),
log_authority: log_authority.as_ref().clone(),
phoenix_instruction,
scratch_buffer: Vec::with_capacity(MAX_EVENT_SIZE),
log_instruction: Instruction {
program_id: crate::id(),
accounts: vec![AccountMeta::new_readonly(phoenix_log_authority::id(), true)],
data,
},
state_tracker: EventStateTracker::default(),
error_code: None,
})
}
pub(crate) fn flush(&mut self) -> ProgramResult {
let batch_size = self.state_tracker.get_batch_size();
self.state_tracker.print_status();
self.log_instruction.data[(HEADER_LEN - 2)..HEADER_LEN]
.copy_from_slice(&batch_size.to_le_bytes());
invoke_signed(
&self.log_instruction,
&[
self.phoenix_program.as_ref().clone(),
self.log_authority.as_ref().clone(),
],
&[&[b"log", &[phoenix_log_authority::bump()]]],
)?;
self.log_instruction.data.drain(HEADER_LEN..);
self.state_tracker.process_events();
Ok(())
}
pub(crate) fn add_event(&mut self, event: MarketEvent<Pubkey>) {
if self.error_code.is_some() {
return;
}
let mut event = PhoenixMarketEvent::from(event);
event.set_index(self.state_tracker.events_added);
if !self.scratch_buffer.is_empty() {
self.error_code = Some(PhoenixError::NonEmptyScratchBuffer);
return;
}
if event.serialize(&mut self.scratch_buffer).is_err() {
self.error_code = Some(PhoenixError::FailedToSerializeEvent);
return;
}
let data_len = self.log_instruction.data.len() + self.scratch_buffer.len();
if data_len + LOG_IX_ACCOUNT_META_SIZE > MAX_INNER_INSTRUCTION_SIZE && self.flush().is_err()
{
self.error_code = Some(PhoenixError::FailedToFlushBuffer);
return;
}
self.log_instruction
.data
.extend_from_slice(&self.scratch_buffer);
self.state_tracker.add_event();
self.scratch_buffer.drain(..);
}
pub(crate) fn increment_market_sequence_number_and_flush(
&mut self,
market_info: MarketAccountInfo<'_, 'info>,
) -> ProgramResult {
if let Some(err) = self.error_code {
phoenix_log!("ERROR: Event recorder failed to record events: {}", err);
return Err(err.into());
}
if market_info.data_is_empty() {
assert_with_msg(
self.phoenix_instruction == PhoenixInstruction::ChangeMarketStatus,
ProgramError::InvalidInstructionData,
"The only instruction that can be used to delete a market is ChangeMarketStatus",
)?;
} else {
market_info.get_header_mut()?.increment_sequence_number();
};
if self.state_tracker.has_events_to_process() {
self.flush()?;
}
Ok(())
}
}
#[derive(Default)]
pub(crate) struct EventStateTracker {
batch_index: usize,
events_emitted: u16,
events_added: u16,
}
impl EventStateTracker {
pub(crate) fn print_status(&self) {
phoenix_log!(
"Sending batch {} with header and {} market events, total events sent: {}",
self.batch_index + 1,
self.get_batch_size(),
self.events_added,
);
}
pub(crate) fn get_batch_size(&self) -> u16 {
self.events_added - self.events_emitted
}
pub(crate) fn process_events(&mut self) {
self.events_emitted = self.events_added;
self.batch_index += 1;
}
pub(crate) fn add_event(&mut self) {
self.events_added += 1
}
pub(crate) fn has_events_to_process(&self) -> bool {
self.batch_index == 0 || self.events_emitted < self.events_added
}
}