realtek-rtl8125 0.2.3

Minimal Realtek RTL8125 PCIe Ethernet driver
Documentation
use alloc::{boxed::Box, collections::VecDeque, sync::Arc};
use core::sync::atomic::{Ordering as AtomicOrdering, fence};

use dma_api::CoherentArray;
use log::{debug, info, warn};
use rdif_eth::{DmaBuffer, IRxQueue, ITxQueue, NetError, QueueConfig};
use spin::Mutex;

use crate::{
    DMA_ALIGN, EARLY_PACKET_LOG_COUNT, LINK_DOWN_DROP_LOG_INTERVAL, MAX_PACKET, QUEUE_ID0,
    QUEUE_SIZE, RX_BUF_SIZE, RX_DESC_PER_CACHE_LINE, RX_IDLE_LOG_INTERVAL,
    RX_OVERFLOW_REARM_IDLE_POLLS, RX_QUEUE_CONFIG_SIZE, RX_RECLAIM_LOG_INTERVAL,
    RX_START_THRESHOLD, TX_LINK_SAMPLE_INTERVAL, TX_RECLAIM_LOG_INTERVAL, TX_SUBMIT_LOG_INTERVAL,
    descriptor::{RxDesc, TxDesc},
    read_status,
    registers::{DEFAULT_IRQ_MASK, Regs, irq_has_rx_overflow},
    set_rx_mode,
};

pub(crate) type QueueStart = Arc<Mutex<QueueStartState>>;

#[derive(Default)]
pub(crate) struct QueueStartState {
    pub(crate) tx_base: Option<u64>,
    pub(crate) rx_base: Option<u64>,
    pub(crate) rx_ready: bool,
    pub(crate) started: bool,
}

pub(crate) struct Rtl8125TxQueue {
    pub(crate) regs: Regs,
    pub(crate) desc: CoherentArray<TxDesc>,
    pub(crate) dma_mask: u64,
    pub(crate) bus_addrs: [Option<u64>; QUEUE_SIZE],
    pub(crate) next_submit: usize,
    pub(crate) next_reclaim: usize,
    pub(crate) link_up: Option<bool>,
    pub(crate) link_down_drops: u64,
    pub(crate) submitted: u64,
    pub(crate) reclaimed: u64,
}

impl ITxQueue for Rtl8125TxQueue {
    fn id(&self) -> usize {
        QUEUE_ID0
    }

    fn config(&self) -> QueueConfig {
        QueueConfig {
            dma_mask: self.dma_mask,
            align: DMA_ALIGN,
            buf_size: MAX_PACKET,
            ring_size: QUEUE_SIZE,
        }
    }

    fn submit(&mut self, buffer: DmaBuffer) -> core::result::Result<(), NetError> {
        if buffer.len > MAX_PACKET {
            return Err(NetError::NotSupported);
        }

        if !self.observe_link_before_tx(buffer.len) {
            self.link_down_drops = self.link_down_drops.saturating_add(1);
            return Err(NetError::Retry);
        }

        let idx = self.next_submit;
        let next = (idx + 1) % QUEUE_SIZE;
        if self.bus_addrs[idx].is_some() {
            return Err(NetError::Retry);
        }

        let ring_end = idx == QUEUE_SIZE - 1;
        let desc = TxDesc::new_cpu_owned(buffer.bus_addr, buffer.len, ring_end);
        self.desc.set_cpu(idx, desc);
        release_dma_descriptor();
        self.desc.set_cpu(idx, desc.release_to_hw());
        self.bus_addrs[idx] = Some(buffer.bus_addr);
        self.next_submit = next;
        self.submitted = self.submitted.saturating_add(1);
        self.regs.poll_tx();
        if self.submitted <= EARLY_PACKET_LOG_COUNT
            || self.submitted.is_multiple_of(TX_SUBMIT_LOG_INTERVAL)
        {
            info!(
                "RTL8125 tx submitted: idx={idx}, len={}, submitted={}, reclaimed={}, status={:?}",
                buffer.len,
                self.submitted,
                self.reclaimed,
                read_status(self.regs),
            );
        }
        Ok(())
    }

    fn reclaim(&mut self) -> Option<u64> {
        let idx = self.next_reclaim;
        self.bus_addrs[idx]?;
        let desc = self.desc.read_cpu(idx)?;
        if desc.is_owned_by_hw() {
            return None;
        }

        self.next_reclaim = (idx + 1) % QUEUE_SIZE;
        let bus_addr = self.bus_addrs[idx].take()?;
        self.reclaimed = self.reclaimed.saturating_add(1);
        if self.reclaimed <= EARLY_PACKET_LOG_COUNT
            || self.reclaimed.is_multiple_of(TX_RECLAIM_LOG_INTERVAL)
        {
            info!(
                "RTL8125 tx reclaimed: idx={idx}, len={}, submitted={}, reclaimed={}, status={:?}",
                desc.len(),
                self.submitted,
                self.reclaimed,
                read_status(self.regs),
            );
        }
        Some(bus_addr)
    }
}

impl Rtl8125TxQueue {
    fn observe_link_before_tx(&mut self, len: usize) -> bool {
        let must_sample = self.link_up != Some(true)
            || self.submitted == 0
            || self.submitted.is_multiple_of(TX_LINK_SAMPLE_INTERVAL);
        if !must_sample {
            return true;
        }

        let link_up = self.regs.link_up();
        let changed = self.link_up.replace(link_up) != Some(link_up);

        if link_up {
            if changed {
                let status = read_status(self.regs);
                info!("RTL8125 tx link up before submit: len={len}, status={status:?}");
            }
        } else if changed
            || self.link_down_drops == 0
            || self
                .link_down_drops
                .is_multiple_of(LINK_DOWN_DROP_LOG_INTERVAL)
        {
            let status = read_status(self.regs);
            warn!(
                "RTL8125 tx link down before submit: len={len}, dropped_tx={}, status={status:?}",
                self.link_down_drops
            );
        }

        link_up
    }
}

pub(crate) struct Rtl8125RxQueue {
    pub(crate) regs: Regs,
    pub(crate) desc: CoherentArray<RxDesc>,
    pub(crate) dma_mask: u64,
    pub(crate) start: QueueStart,
    pub(crate) bus_addrs: [Option<u64>; QUEUE_SIZE],
    pub(crate) next_submit: usize,
    pub(crate) next_reclaim: usize,
    pub(crate) idle_polls: u64,
    pub(crate) last_rx_rearm_idle: u64,
    pub(crate) submitted: usize,
    pub(crate) reclaimed: u64,
    pub(crate) rx_errors: u64,
    pub(crate) deferred_refill: VecDeque<u64>,
}

impl IRxQueue for Rtl8125RxQueue {
    fn id(&self) -> usize {
        QUEUE_ID0
    }

    fn config(&self) -> QueueConfig {
        QueueConfig {
            dma_mask: self.dma_mask,
            align: DMA_ALIGN,
            buf_size: RX_BUF_SIZE,
            ring_size: RX_QUEUE_CONFIG_SIZE,
        }
    }

    fn submit(&mut self, buffer: DmaBuffer) -> core::result::Result<(), NetError> {
        if buffer.len < RX_BUF_SIZE {
            return Err(NetError::NotSupported);
        }

        self.flush_deferred_refill();
        if self.submitted >= RX_START_THRESHOLD {
            self.deferred_refill.push_back(buffer.bus_addr);
            self.flush_deferred_refill();
            return Ok(());
        }

        let idx = self.next_submit;
        let next = (idx + 1) % QUEUE_SIZE;
        if self.bus_addrs[idx].is_some() {
            return Err(NetError::Retry);
        }

        let ring_end = idx == QUEUE_SIZE - 1;
        let desc = RxDesc::new_cpu_owned(buffer.bus_addr, RX_BUF_SIZE, ring_end);
        self.desc.set_cpu(idx, desc);
        release_dma_descriptor();
        self.desc.set_cpu(idx, desc.release_to_hw());
        self.bus_addrs[idx] = Some(buffer.bus_addr);
        self.next_submit = next;
        self.submitted = self.submitted.saturating_add(1);
        if self.submitted >= RX_START_THRESHOLD {
            let was_ready = {
                let mut start = self.start.lock();
                let was_ready = start.rx_ready;
                start.rx_ready = true;
                was_ready
            };
            if !was_ready {
                let last_opts1 = self
                    .desc
                    .read_cpu(QUEUE_SIZE - 1)
                    .map_or(0, |desc| desc.opts1);
                info!(
                    "RTL8125 rx ring ready: submitted={}, last_desc_opts1={:#x}",
                    self.submitted, last_opts1
                );
            }
            try_start_queues(self.regs, self.dma_mask, &self.start);
        }
        Ok(())
    }

    fn reclaim(&mut self) -> Option<(u64, usize)> {
        let idx = self.next_reclaim;
        let bus_addr = self.bus_addrs[idx]?;
        let desc = self.desc.read_cpu(idx)?;
        if desc.is_owned_by_hw() {
            self.idle_polls = self.idle_polls.saturating_add(1);
            let status = read_status(self.regs);
            if irq_has_rx_overflow(status.intr_status)
                && self.idle_polls.saturating_sub(self.last_rx_rearm_idle)
                    >= RX_OVERFLOW_REARM_IDLE_POLLS
            {
                self.last_rx_rearm_idle = self.idle_polls;
                warn!(
                    "RTL8125 rx overflow rearm: idx={idx}, opts1={:#x}, submitted={}, \
                     reclaimed={}, status={status:?}",
                    desc.opts1, self.submitted, self.reclaimed
                );
                self.regs.write_interrupt_status(status.intr_status);
                set_rx_mode(self.regs);
                self.regs.enable_tx_rx();
                self.regs.commit();
            }
            if self.idle_polls.is_multiple_of(RX_IDLE_LOG_INTERVAL) {
                debug!(
                    "RTL8125 rx idle: idx={idx}, opts1={:#x}, submitted={}, reclaimed={}, \
                     status={:?}",
                    desc.opts1, self.submitted, self.reclaimed, status,
                );
            }
            return None;
        }
        acquire_dma_descriptor();
        let desc = self.desc.read_cpu(idx)?;
        self.idle_polls = 0;
        self.last_rx_rearm_idle = 0;

        self.next_reclaim = (idx + 1) % QUEUE_SIZE;
        self.bus_addrs[idx] = None;

        if desc.has_error() || !desc.is_whole_packet() {
            self.rx_errors = self.rx_errors.saturating_add(1);
            warn!(
                "RTL8125 rx error: idx={idx}, opts1={:#x}, submitted={}, reclaimed={}, errors={}, \
                 status={:?}",
                desc.opts1,
                self.submitted,
                self.reclaimed,
                self.rx_errors,
                read_status(self.regs),
            );
            return Some((bus_addr, 0));
        }
        let len = desc.packet_len();
        self.reclaimed = self.reclaimed.saturating_add(1);
        if self.reclaimed.is_multiple_of(RX_RECLAIM_LOG_INTERVAL) {
            info!(
                "RTL8125 rx packet: idx={idx}, len={len}, submitted={}, reclaimed={}, status={:?}",
                self.submitted,
                self.reclaimed,
                read_status(self.regs),
            );
        }
        Some((bus_addr, len))
    }
}

impl Rtl8125RxQueue {
    fn flush_deferred_refill(&mut self) {
        while self.deferred_refill.len() >= RX_DESC_PER_CACHE_LINE {
            let Some(bus_addr) = self.deferred_refill.pop_front() else {
                break;
            };
            if let Err(err) = self.submit_deferred_buffer(bus_addr) {
                warn!("RTL8125 rx deferred refill failed: {err:?}");
                self.deferred_refill.push_front(bus_addr);
                break;
            }
        }
    }

    fn submit_deferred_buffer(&mut self, bus_addr: u64) -> core::result::Result<(), NetError> {
        let idx = self.next_submit;
        let next = (idx + 1) % QUEUE_SIZE;
        if self.bus_addrs[idx].is_some() {
            return Err(NetError::Retry);
        }

        let ring_end = idx == QUEUE_SIZE - 1;
        let desc = RxDesc::new_cpu_owned(bus_addr, RX_BUF_SIZE, ring_end);
        self.desc.set_cpu(idx, desc);
        release_dma_descriptor();
        self.desc.set_cpu(idx, desc.release_to_hw());
        self.bus_addrs[idx] = Some(bus_addr);
        self.next_submit = next;
        self.submitted = self.submitted.saturating_add(1);
        Ok(())
    }
}

pub(crate) fn release_dma_descriptor() {
    fence(AtomicOrdering::Release);
}

fn acquire_dma_descriptor() {
    fence(AtomicOrdering::Acquire);
}

pub(crate) fn try_start_queues(regs: Regs, dma_mask: u64, start: &QueueStart) {
    let (tx_base, rx_base) = {
        let mut start = start.lock();
        if start.started || !start.rx_ready {
            return;
        }
        let (Some(tx_base), Some(rx_base)) = (start.tx_base, start.rx_base) else {
            return;
        };
        start.started = true;
        (tx_base, rx_base)
    };

    regs.unlock_config();
    regs.write_tx_desc_base(tx_base);
    regs.write_rx_desc_base(rx_base);
    regs.lock_config();

    info!("RTL8125 queue DMA bases: tx={tx_base:#x}, rx={rx_base:#x}, mask={dma_mask:#x}");
    regs.write_rx_max_size(RX_BUF_SIZE as u16 + 1);
    regs.enable_tx_rx();
    regs.write_default_rx_config_8125b();
    regs.write_default_tx_config();
    regs.write_interrupt_status(u32::MAX);
    set_rx_mode(regs);
    regs.write_interrupt_mask(DEFAULT_IRQ_MASK);
    regs.commit();
    info!("RTL8125 queues started: status={:?}", read_status(regs));
}

pub(crate) fn boxed_tx(queue: Rtl8125TxQueue) -> Box<dyn ITxQueue> {
    Box::new(queue)
}

pub(crate) fn boxed_rx(queue: Rtl8125RxQueue) -> Box<dyn IRxQueue> {
    Box::new(queue)
}