ptcow 0.6.0

Library for editing and playback of PxTone (.ptcop) music
Documentation
use {
    crate::{
        Meas, NATIVE_SAMPLE_RATE, SampleRate, SampleT, Voice,
        event::{Event, EventPayload},
        herd::{Herd, MooInstructions, Song},
        master::Master,
        pulse_frequency::PULSE_FREQ,
        timing::{self, Tick, meas_to_sample},
        unit::{MAX_CHANNEL, PanTimeBuf},
        util::ArrayLenExt as _,
    },
    std::ops::ControlFlow,
};

/// Get the current [`Tick`] the playback is at.
#[expect(
    clippy::cast_sign_loss,
    clippy::cast_precision_loss,
    clippy::cast_possible_truncation
)]
#[must_use]
pub fn current_tick(herd: &Herd, ins: &MooInstructions) -> Tick {
    (herd.smp_count as f32 / ins.samples_per_tick) as u32
}

pub trait OutSample {
    fn from_moo_samp(moo_samp: i32) -> Self;
}

impl OutSample for i16 {
    #[expect(clippy::cast_possible_truncation)]
    fn from_moo_samp(moo_samp: i32) -> Self {
        moo_samp.clamp(i32::from(Self::MIN), i32::from(Self::MAX)) as Self
    }
}

pub(super) fn next_sample<T: OutSample>(
    herd: &mut Herd,
    ins: &MooInstructions,
    events: &[Event],
    master: &Master,
    dst_sps: SampleRate,
    out: &mut [T; 2],
    advance: bool,
    extra_units: &mut [crate::Unit],
    extra_voices: &[Voice],
) -> bool {
    for unit in herd.units.iter_mut().chain(extra_units.iter_mut()) {
        unit.tone_envelope(&ins.voices, extra_voices);
    }

    if advance {
        let clock = current_tick(herd, ins);

        while herd.evt_idx < events.len() && (events[herd.evt_idx]).tick <= clock {
            if do_next_event(herd, ins, events, master, clock, dst_sps, extra_voices).is_break() {
                break;
            }
        }
    }

    for unit in herd.units.iter_mut().chain(extra_units.iter_mut()) {
        unit.tone_sample(
            herd.time_pan_index,
            herd.smp_smooth,
            &ins.voices,
            extra_voices,
        );
    }

    for ch in 0..MAX_CHANNEL {
        let mut group_smps = [0; _];
        for unit in herd.units.iter_mut().chain(extra_units.iter_mut()) {
            if !unit.mute {
                unit.tone_supple(&mut group_smps, ch, herd.time_pan_index);
            }
        }
        for ovr in &mut herd.overdrives {
            ovr.tone_supple(&mut group_smps);
        }
        for delay in &mut herd.delays {
            delay.tone_supple(ch, &mut group_smps);
        }

        let mut out_samp: i32 = 0;

        for group_smp in group_smps {
            out_samp += group_smp;
        }

        out[ch as usize] = T::from_moo_samp(out_samp);
    }
    if advance {
        herd.smp_count += 1;
    }
    herd.time_pan_index = (herd.time_pan_index + 1) & (PanTimeBuf::LEN - 1);

    for unit in herd.units.iter_mut().chain(extra_units.iter_mut()) {
        #[expect(clippy::cast_sign_loss)]
        let key_now = unit.tone_increment_key() as usize;
        unit.tone_increment_sample(
            PULSE_FREQ.get2(key_now) * herd.smp_stride,
            &ins.voices,
            extra_voices,
        );
    }

    for delay in &mut herd.delays {
        delay.tone_increment();
    }

    if herd.smp_count >= herd.smp_end {
        if !herd.loop_ {
            return false;
        }
        herd.smp_count = herd.smp_repeat;
        herd.evt_idx = 0;
        herd.tune_cow_voices(ins, master.timing, extra_voices);
    }
    true
}

fn do_next_event(
    herd: &mut Herd,
    ins: &MooInstructions,
    events: &[Event],
    master: &Master,
    clock: Tick,
    dst_sps: SampleRate,
    extra_voices: &[Voice],
) -> ControlFlow<()> {
    let evt = &events[herd.evt_idx];
    do_event(
        herd,
        ins,
        &events[herd.evt_idx + 1..],
        master,
        clock,
        dst_sps,
        evt,
        extra_voices,
    )?;
    herd.evt_idx += 1;
    ControlFlow::Continue(())
}

/// Do a single event
///
/// Usually you can provide an empty `events_after` slice.
/// `events_after` is only needed for normal playback of a song (with [`Herd::moo`]).
pub fn do_event(
    herd: &mut Herd,
    ins: &MooInstructions,
    events_after: &[Event],
    master: &Master,
    clock: u32,
    dst_sps: u16,
    evt: &Event,
    extra_voices: &[Voice],
) -> ControlFlow<()> {
    let u = evt.unit;
    let Some(unit) = herd.units.get_mut(u) else {
        return ControlFlow::Break(());
    };

    match evt.payload {
        EventPayload::On { duration } => {
            unit.on(
                evt.unit,
                ins,
                events_after,
                clock,
                duration,
                evt.tick,
                herd.smp_end,
                extra_voices,
            );
        }
        EventPayload::Key(key) => unit.set_key(key),
        EventPayload::PanVol(vol) => unit.tone_pan_volume(vol),
        EventPayload::PanTime(pan) => unit.tone_pan_time(pan, dst_sps),
        EventPayload::Velocity(vel) => unit.velocity = vel,
        EventPayload::Volume(vol) => unit.volume = vol,
        EventPayload::Portament { duration } => {
            unit.porta_destination = timing::tick_to_sample(duration, ins.samples_per_tick);
        }
        EventPayload::BeatClock
        | EventPayload::BeatTempo
        | EventPayload::BeatNum
        | EventPayload::Repeat
        | EventPayload::Last
        | EventPayload::PtcowDebug(_) => {}
        EventPayload::SetVoice(num) => unit.reset_voice(ins, num, master.timing, extra_voices),
        EventPayload::SetGroup(num) => unit.group = num,
        EventPayload::Tuning(tuning) => unit.tuning = tuning,
        EventPayload::Null => return ControlFlow::Break(()),
    }
    ControlFlow::Continue(())
}

fn get_total_sample(master: &Master, out_sample_rate: SampleRate) -> u32 {
    calc_sample_num(
        master.meas_num,
        master.timing.beats_per_meas.into(),
        out_sample_rate,
        master.timing.bpm,
    )
}

#[expect(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
fn calc_sample_num(meas_num: u32, beat_num: u32, sps: SampleRate, beat_tempo: f32) -> u32 {
    if beat_tempo == 0. {
        return 0;
    }
    let total_beat_num: u32 = meas_num * beat_num;
    (f64::from(sps) * 60. * f64::from(total_beat_num) / f64::from(beat_tempo)) as u32
}

/// Prepare to [`moo`](Herd::moo).
///
/// # Panics
///
/// - If `ins.out_sample_rate` is 0
#[expect(
    clippy::cast_possible_truncation,
    clippy::cast_precision_loss,
    clippy::cast_sign_loss
)]
pub fn moo_prepare(
    ins: &mut MooInstructions,
    herd: &mut Herd,
    song: &Song,
    plan: &MooPlan,
    extra_voices: &[Voice],
) {
    assert_ne!(ins.out_sample_rate, 0);

    let meas_end = plan.meas_end.unwrap_or_else(|| song.master.end_meas());
    let meas_repeat = plan.meas_repeat.unwrap_or(song.master.loop_points.repeat);

    herd.loop_ = plan.loop_;

    ins.samples_per_tick = timing::samples_per_tick(ins.out_sample_rate, song.master.timing);
    herd.smp_stride = f32::from(NATIVE_SAMPLE_RATE) / f32::from(ins.out_sample_rate);

    herd.time_pan_index = 0;

    herd.smp_end = meas_to_sample(meas_end, ins.samples_per_tick, song.master.timing);
    herd.smp_repeat = meas_to_sample(meas_repeat, ins.samples_per_tick, song.master.timing);

    herd.smp_start = match plan.start_pos {
        StartPosPlan::Meas(val) => meas_to_sample(val, ins.samples_per_tick, song.master.timing),
        StartPosPlan::Sample(val) => val,
        StartPosPlan::F32(val) => {
            (get_total_sample(&song.master, ins.out_sample_rate) as f32 * val) as u32
        }
    };

    herd.smp_count = herd.smp_start;
    herd.smp_smooth = ins.out_sample_rate / 250;

    herd.evt_idx = 0;
    herd.tune_cow_voices(ins, song.master.timing, extra_voices);
}

impl Herd {
    /// Moo the song into a stereo signed 16 bit little endian PCM buffer.
    ///
    /// If `advance` is true, the playback proceeds to the next event.
    /// Setting it to false can be useful for pausing playback, while still allowing
    /// the [`Unit`](crate::Unit)s to play audio.
    pub fn moo<T: OutSample>(
        &mut self,
        ins: &MooInstructions,
        song: &Song,
        buf: &mut [T],
        advance: bool,
        extra_units: &mut [crate::Unit],
        extra_voices: &[Voice],
    ) -> bool {
        if self.moo_end {
            return false;
        }

        for out_samp in buf.as_chunks_mut().0 {
            if !next_sample(
                self,
                ins,
                &song.events,
                &song.master,
                ins.out_sample_rate,
                out_samp,
                advance,
                extra_units,
                extra_voices,
            ) {
                self.moo_end = true;
                break;
            }
        }

        true
    }
}

/// Plan for the cows on how to moo the song
#[derive(Copy, Clone)]
pub struct MooPlan {
    /// Start position
    pub start_pos: StartPosPlan,
    /// End position
    pub meas_end: Option<Meas>,
    /// Repeat position
    pub meas_repeat: Option<Meas>,
    /// Whether to loop the song
    pub loop_: bool,
}

/// Start position that can be given in different units
#[derive(Copy, Clone)]
pub enum StartPosPlan {
    /// Start position as [`Meas`]
    Meas(Meas),
    /// Start position as [`SampleT`]
    Sample(SampleT),
    /// Start position as [`f32`]
    F32(f32),
}