#![cfg_attr(not(feature = "std"), no_std)]
#![allow(clippy::identity_op)]
#![allow(clippy::unnecessary_cast)]
#![allow(clippy::erasing_op)]
#![allow(clippy::new_without_default)]
#[cfg(feature = "defmt")]
use defmt::debug;
use core::fmt;
use core::fmt::Display;
#[allow(clippy::identity_op)]
#[allow(clippy::unnecessary_cast)]
mod generated {
include!(concat!(env!("OUT_DIR"), "/cat25040_device.rs"));
}
mod register_interface;
pub use generated::field_sets::StatusReg as Status;
pub use generated::Cat25040Device;
pub use register_interface::SpiRegisterInterface;
const READ_OPCODE: u8 = 0x03;
const WRITE_OPCODE: u8 = 0x02;
const BUSY_WAIT_TIME_MS: u32 = 1;
const WRITE_CYCLE_TIME_MS: u32 = 5;
const PAGE_SIZE: u8 = 16;
#[allow(async_fn_in_trait)]
pub trait Spi {
async fn transaction(&mut self, operations: &mut [Operation<'_, u8>]) -> Result<(), Cat25040Error>;
}
#[allow(async_fn_in_trait)]
pub trait HardwareInterface {
async fn wait_ms(&mut self, timeout_ms: u32);
}
pub struct Cat25040<S: Spi, H: HardwareInterface> {
device: Cat25040Device<SpiRegisterInterface<S>>,
hardware_interface: H,
}
pub use embedded_hal_async::spi::Operation;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Cat25040Error {
Spi,
InvalidAddress,
InvalidLength,
DeviceNotReady,
}
impl Display for Cat25040Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Cat25040Error: {self:?}")
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum BlockProtection {
None,
UpperQuarter,
UpperHalf,
Full,
}
impl BlockProtection {
const fn to_bits(self) -> (bool, bool) {
match self {
BlockProtection::None => (false, false),
BlockProtection::UpperQuarter => (false, true),
BlockProtection::UpperHalf => (true, false),
BlockProtection::Full => (true, true),
}
}
const fn from_bits(bp1: bool, bp0: bool) -> Self {
match (bp1, bp0) {
(false, false) => BlockProtection::None,
(false, true) => BlockProtection::UpperQuarter,
(true, false) => BlockProtection::UpperHalf,
(true, true) => BlockProtection::Full,
}
}
}
impl<S: Spi, H: HardwareInterface> Cat25040<S, H> {
pub fn new(spi: S, hardware_interface: H) -> Self {
let spi_interface = SpiRegisterInterface::new(spi);
let device = Cat25040Device::new(spi_interface);
Self {
device,
hardware_interface,
}
}
pub async fn init(&mut self) -> Result<(), Cat25040Error> {
self.hardware_interface.wait_ms(1).await;
let status = self.read_status().await?;
if status.busy() {
return Err(Cat25040Error::DeviceNotReady);
}
#[cfg(feature = "defmt")]
debug!("Device initialized successfully");
Ok(())
}
pub fn device(&mut self) -> &mut Cat25040Device<SpiRegisterInterface<S>> {
&mut self.device
}
pub async fn read_status(&mut self) -> Result<Status, Cat25040Error> {
self.device.status_reg().read_async().await
}
pub async fn is_busy(&mut self) -> Result<bool, Cat25040Error> {
let status = self.read_status().await?;
Ok(status.busy())
}
pub async fn write_enable(&mut self) -> Result<(), Cat25040Error> {
self.device.wren().dispatch_async().await
}
pub async fn write_disable(&mut self) -> Result<(), Cat25040Error> {
self.device.wrdi().dispatch_async().await
}
pub async fn get_block_protection(&mut self) -> Result<BlockProtection, Cat25040Error> {
let status = self.read_status().await?;
Ok(BlockProtection::from_bits(status.bp_1(), status.bp_0()))
}
pub async fn set_block_protection(&mut self, level: BlockProtection) -> Result<(), Cat25040Error> {
self.write_enable().await?;
let (bp1, bp0) = level.to_bits();
self.device.write_status_reg().write_async(|reg| {
reg.set_bp_0(bp0);
reg.set_bp_1(bp1);
}).await?;
self.hardware_interface.wait_ms(WRITE_CYCLE_TIME_MS).await;
while self.is_busy().await? {
self.hardware_interface.wait_ms(BUSY_WAIT_TIME_MS).await;
}
#[cfg(feature = "defmt")]
debug!("Set block protection to {:?}", level);
Ok(())
}
pub async fn clear_block_protection(&mut self) -> Result<(), Cat25040Error> {
self.set_block_protection(BlockProtection::None).await
}
fn get_valid_opcode_for_address(address: u16, opcode: u8) -> u8 {
if address & (1 << 8) != 0 {
opcode | (1 << 3)
} else {
opcode
}
}
#[allow(clippy::cast_possible_truncation)] pub async fn read(&mut self, address: u16, rx_buf: &mut [u8]) -> Result<(), Cat25040Error> {
let opcode = Self::get_valid_opcode_for_address(address, READ_OPCODE);
let spi = self.device.interface.spi_mut();
spi.transaction(&mut [
Operation::Write(&[opcode, address as u8]),
Operation::Read(rx_buf),
])
.await?;
#[cfg(feature = "defmt")]
debug!(
"Read 0x{:02x} from address 0x{:02x} with opcode 0x{:02x}",
rx_buf[0], address, opcode
);
Ok(())
}
#[allow(clippy::cast_possible_truncation)] pub async fn write_byte(&mut self, data: u8, address: u16) -> Result<(), Cat25040Error> {
let mut current_data = [0u8; 1];
self.read(address, &mut current_data).await?;
if current_data[0] == data {
#[cfg(feature = "defmt")]
debug!(
"Data at address 0x{:02x} is already 0x{:02x}, skipping write",
address, data
);
return Ok(());
}
self.write_enable().await?;
let opcode = Self::get_valid_opcode_for_address(address, WRITE_OPCODE);
let spi = self.device.interface.spi_mut();
spi.transaction(&mut [Operation::Write(&[opcode, address as u8, data])])
.await?;
self.hardware_interface.wait_ms(WRITE_CYCLE_TIME_MS).await;
while self.is_busy().await? {
self.hardware_interface.wait_ms(BUSY_WAIT_TIME_MS).await;
}
#[cfg(feature = "defmt")]
debug!(
"Wrote 0x{:02x} to address 0x{:02x} with opcode 0x{:02x}",
data, address, opcode
);
Ok(())
}
#[allow(clippy::cast_possible_truncation)] pub async fn write_page(&mut self, data: &[u8], address: u16) -> Result<(), Cat25040Error> {
if !address.is_multiple_of(u16::from(PAGE_SIZE)) {
return Err(Cat25040Error::InvalidAddress);
}
if data.len() != PAGE_SIZE as usize {
return Err(Cat25040Error::InvalidLength);
}
let mut cmd = [0u8; PAGE_SIZE as usize + 2];
cmd[0] = Self::get_valid_opcode_for_address(address, WRITE_OPCODE);
cmd[1] = address as u8;
cmd[2..PAGE_SIZE as usize + 2].copy_from_slice(data);
self.write_enable().await?;
let spi = self.device.interface.spi_mut();
spi.transaction(&mut [Operation::Write(&cmd)]).await?;
self.hardware_interface.wait_ms(WRITE_CYCLE_TIME_MS).await;
while self.is_busy().await? {
self.hardware_interface.wait_ms(BUSY_WAIT_TIME_MS).await;
}
#[cfg(feature = "defmt")]
debug!("Wrote page to address 0x{:02x}", address);
Ok(())
}
}
pub mod spi_device;
#[cfg(test)]
mod tests {
use super::*;
use futures::executor::block_on;
#[derive(Debug)]
struct MockSpi {
status_register: u8,
write_enable: bool,
memory: [u8; 512],
read_data: u8,
}
impl MockSpi {
pub fn new() -> Self {
Self {
status_register: 0,
write_enable: false,
memory: [0; 512],
read_data: 0,
}
}
pub fn set_memory(&mut self, address: u16, data: u8) {
self.memory[address as usize] = data;
}
}
impl Spi for MockSpi {
async fn transaction(&mut self, operations: &mut [Operation<'_, u8>]) -> Result<(), Cat25040Error> {
const WREN_OPCODE: u8 = 0x06;
const WRDI_OPCODE: u8 = 0x04;
const RDSR_OPCODE: u8 = 0x05;
const WRSR_OPCODE: u8 = 0x01;
for operation in operations {
match operation {
Operation::Write(data) => {
let base_opcode = data[0] & !0x08;
let a8: u16 = if data[0] & 0x08 != 0 { 0x100 } else { 0 };
match base_opcode {
WREN_OPCODE => {
self.write_enable = true;
}
WRDI_OPCODE => {
self.write_enable = false;
}
RDSR_OPCODE => {
self.read_data = self.status_register;
}
WRSR_OPCODE => {
if data.len() >= 2 {
let new_bp_bits = data[1] & 0x0C; self.status_register = (self.status_register & !0x0C) | new_bp_bits;
}
}
READ_OPCODE => {
let addr = a8 | (data[1] as u16);
self.read_data = self.memory[addr as usize];
}
WRITE_OPCODE => {
let base_addr = a8 | (data[1] as u16);
for (i, &byte) in data[2..].iter().enumerate() {
let page_start = base_addr & !0x0F;
let offset = ((base_addr & 0x0F) + i as u16) % PAGE_SIZE as u16;
self.memory[(page_start | offset) as usize] = byte;
}
}
_ => {
println!("MockSpi: unhandled opcode 0x{:02x}", data[0]);
}
}
}
Operation::Read(data) => {
data[0] = self.read_data;
}
_ => {}
}
}
Ok(())
}
}
struct MockHardwareInterface;
impl HardwareInterface for MockHardwareInterface {
async fn wait_ms(&mut self, _timeout_ms: u32) {
}
}
fn make_eeprom(spi: MockSpi) -> Cat25040<MockSpi, MockHardwareInterface> {
Cat25040::new(spi, MockHardwareInterface)
}
type TestCat = Cat25040<MockSpi, MockHardwareInterface>;
#[test]
fn opcode_unchanged_when_a8_clear() {
assert_eq!(TestCat::get_valid_opcode_for_address(0x00, READ_OPCODE), READ_OPCODE);
}
#[test]
fn opcode_unchanged_at_max_8bit_address() {
assert_eq!(TestCat::get_valid_opcode_for_address(0xFF, READ_OPCODE), READ_OPCODE);
}
#[test]
fn opcode_sets_bit3_when_a8_set() {
assert_eq!(TestCat::get_valid_opcode_for_address(0x100, READ_OPCODE), 0x0B);
}
#[test]
fn write_opcode_sets_bit3_when_a8_set() {
assert_eq!(TestCat::get_valid_opcode_for_address(0x1FF, WRITE_OPCODE), 0x0A);
}
#[test]
fn opcode_preserves_existing_bits() {
assert_eq!(TestCat::get_valid_opcode_for_address(0x100, 0xFF), 0xFF);
}
#[test]
fn read_returns_zero_from_blank_memory() {
let mut eeprom = make_eeprom(MockSpi::new());
let mut buf = [0xFFu8; 1];
block_on(eeprom.read(0x0000, &mut buf)).unwrap();
assert_eq!(buf[0], 0x00);
}
#[test]
fn read_returns_prepopulated_data() {
let mut spi = MockSpi::new();
spi.set_memory(0x10, 0xAB);
let mut eeprom = make_eeprom(spi);
let mut buf = [0u8; 1];
block_on(eeprom.read(0x10, &mut buf)).unwrap();
assert_eq!(buf[0], 0xAB);
}
#[test]
fn read_with_a8_address() {
let mut spi = MockSpi::new();
spi.set_memory(0x1FF, 0xCD);
let mut eeprom = make_eeprom(spi);
let mut buf = [0u8; 1];
block_on(eeprom.read(0x1FF, &mut buf)).unwrap();
assert_eq!(buf[0], 0xCD);
}
#[test]
fn read_different_addresses_return_different_data() {
let mut spi = MockSpi::new();
spi.set_memory(0x00, 0x11);
spi.set_memory(0x01, 0x22);
spi.set_memory(0x02, 0x33);
let mut eeprom = make_eeprom(spi);
let mut buf = [0u8; 1];
block_on(eeprom.read(0x00, &mut buf)).unwrap();
assert_eq!(buf[0], 0x11);
block_on(eeprom.read(0x01, &mut buf)).unwrap();
assert_eq!(buf[0], 0x22);
block_on(eeprom.read(0x02, &mut buf)).unwrap();
assert_eq!(buf[0], 0x33);
}
#[test]
fn read_status_reads_mock_status() {
let mut spi = MockSpi::new();
spi.status_register = 0x03;
let mut eeprom = make_eeprom(spi);
let status = block_on(eeprom.read_status()).unwrap();
assert!(status.busy());
assert!(status.wel());
}
#[test]
fn is_busy_returns_true_when_wip_set() {
let mut spi = MockSpi::new();
spi.status_register = 0x01; let mut eeprom = make_eeprom(spi);
assert!(block_on(eeprom.is_busy()).unwrap());
}
#[test]
fn is_busy_returns_false_when_wip_clear() {
let mut eeprom = make_eeprom(MockSpi::new());
assert!(!block_on(eeprom.is_busy()).unwrap());
}
#[test]
fn is_busy_ignores_non_wip_bits() {
let mut spi = MockSpi::new();
spi.status_register = 0x02 | 0x04; let mut eeprom = make_eeprom(spi);
assert!(!block_on(eeprom.is_busy()).unwrap());
}
#[test]
fn write_enable_succeeds() {
let mut eeprom = make_eeprom(MockSpi::new());
block_on(eeprom.write_enable()).unwrap();
}
#[test]
fn write_byte_skips_when_data_already_matches() {
let mut spi = MockSpi::new();
spi.set_memory(0x05, 0xAA);
let mut eeprom = make_eeprom(spi);
block_on(eeprom.write_byte(0xAA, 0x05)).unwrap();
}
#[test]
fn write_byte_skips_at_a8_address_when_data_matches() {
let mut spi = MockSpi::new();
spi.set_memory(0x1FF, 0x42);
let mut eeprom = make_eeprom(spi);
block_on(eeprom.write_byte(0x42, 0x1FF)).unwrap();
}
#[test]
fn write_byte_stores_data_in_memory() {
let mut eeprom = make_eeprom(MockSpi::new());
block_on(eeprom.write_byte(0xBE, 0x10)).unwrap();
let mut buf = [0u8; 1];
block_on(eeprom.read(0x10, &mut buf)).unwrap();
assert_eq!(buf[0], 0xBE);
}
#[test]
fn write_byte_stores_data_at_a8_address() {
let mut eeprom = make_eeprom(MockSpi::new());
block_on(eeprom.write_byte(0xEF, 0x1AB)).unwrap();
let mut buf = [0u8; 1];
block_on(eeprom.read(0x1AB, &mut buf)).unwrap();
assert_eq!(buf[0], 0xEF);
}
#[test]
fn write_byte_then_read_roundtrip() {
let mut eeprom = make_eeprom(MockSpi::new());
let test_data: &[(u16, u8)] = &[
(0x00, 0xAA),
(0x01, 0xBB),
(0x02, 0xCC),
(0x03, 0xDD),
];
for &(addr, byte) in test_data {
block_on(eeprom.write_byte(byte, addr)).unwrap();
}
for &(addr, expected) in test_data {
let mut buf = [0u8; 1];
block_on(eeprom.read(addr, &mut buf)).unwrap();
assert_eq!(buf[0], expected, "mismatch at address 0x{:03x}", addr);
}
}
#[test]
fn write_byte_overwrites_existing_data() {
let mut spi = MockSpi::new();
spi.set_memory(0x20, 0xAA);
let mut eeprom = make_eeprom(spi);
block_on(eeprom.write_byte(0x55, 0x20)).unwrap();
let mut buf = [0u8; 1];
block_on(eeprom.read(0x20, &mut buf)).unwrap();
assert_eq!(buf[0], 0x55);
}
#[test]
fn write_page_stores_full_page() {
let mut eeprom = make_eeprom(MockSpi::new());
let page: [u8; 16] = [
0x00, 0x11, 0x22, 0x33,
0x44, 0x55, 0x66, 0x77,
0x88, 0x99, 0xAA, 0xBB,
0xCC, 0xDD, 0xEE, 0xFF,
];
block_on(eeprom.write_page(&page, 0x00)).unwrap();
for (i, &expected) in page.iter().enumerate() {
let mut buf = [0u8; 1];
block_on(eeprom.read(i as u16, &mut buf)).unwrap();
assert_eq!(buf[0], expected, "mismatch at offset {}", i);
}
}
#[test]
fn write_page_at_a8_address() {
let mut eeprom = make_eeprom(MockSpi::new());
let page = [0xAA; 16];
block_on(eeprom.write_page(&page, 0x100)).unwrap();
for i in 0..16u16 {
let mut buf = [0u8; 1];
block_on(eeprom.read(0x100 + i, &mut buf)).unwrap();
assert_eq!(buf[0], 0xAA, "mismatch at address 0x{:03x}", 0x100 + i);
}
}
#[test]
fn write_page_rejects_unaligned_address() {
let mut eeprom = make_eeprom(MockSpi::new());
let page = [0x00; 16];
let result = block_on(eeprom.write_page(&page, 0x05));
assert!(result.is_err());
}
#[test]
fn write_page_rejects_wrong_length() {
let mut eeprom = make_eeprom(MockSpi::new());
let short = [0x00; 8];
let result = block_on(eeprom.write_page(&short, 0x00));
assert!(result.is_err());
}
#[test]
fn write_page_does_not_affect_adjacent_pages() {
let mut spi = MockSpi::new();
for i in 0..16u16 {
spi.set_memory(i, 0xFF); spi.set_memory(0x20 + i, 0xFF); }
let mut eeprom = make_eeprom(spi);
let page = [0x42; 16];
block_on(eeprom.write_page(&page, 0x10)).unwrap();
let mut buf = [0u8; 1];
block_on(eeprom.read(0x00, &mut buf)).unwrap();
assert_eq!(buf[0], 0xFF, "page 0 was corrupted");
block_on(eeprom.read(0x20, &mut buf)).unwrap();
assert_eq!(buf[0], 0xFF, "page 2 was corrupted");
block_on(eeprom.read(0x10, &mut buf)).unwrap();
assert_eq!(buf[0], 0x42);
}
#[test]
fn write_page_then_overwrite() {
let mut eeprom = make_eeprom(MockSpi::new());
let first = [0x11; 16];
let second = [0x22; 16];
block_on(eeprom.write_page(&first, 0x00)).unwrap();
block_on(eeprom.write_page(&second, 0x00)).unwrap();
let mut buf = [0u8; 1];
for i in 0..16u16 {
block_on(eeprom.read(i, &mut buf)).unwrap();
assert_eq!(buf[0], 0x22, "mismatch at offset {}", i);
}
}
#[test]
fn init_succeeds_when_device_ready() {
let mut eeprom = make_eeprom(MockSpi::new());
block_on(eeprom.init()).unwrap();
}
#[test]
fn init_fails_when_device_busy() {
let mut spi = MockSpi::new();
spi.status_register = 0x01; let mut eeprom = make_eeprom(spi);
let result = block_on(eeprom.init());
assert_eq!(result, Err(Cat25040Error::DeviceNotReady));
}
#[test]
fn get_block_protection_none() {
let mut spi = MockSpi::new();
spi.status_register = 0x00; let mut eeprom = make_eeprom(spi);
let level = block_on(eeprom.get_block_protection()).unwrap();
assert_eq!(level, BlockProtection::None);
}
#[test]
fn get_block_protection_upper_quarter() {
let mut spi = MockSpi::new();
spi.status_register = 0x04; let mut eeprom = make_eeprom(spi);
let level = block_on(eeprom.get_block_protection()).unwrap();
assert_eq!(level, BlockProtection::UpperQuarter);
}
#[test]
fn get_block_protection_upper_half() {
let mut spi = MockSpi::new();
spi.status_register = 0x08; let mut eeprom = make_eeprom(spi);
let level = block_on(eeprom.get_block_protection()).unwrap();
assert_eq!(level, BlockProtection::UpperHalf);
}
#[test]
fn get_block_protection_full() {
let mut spi = MockSpi::new();
spi.status_register = 0x0C; let mut eeprom = make_eeprom(spi);
let level = block_on(eeprom.get_block_protection()).unwrap();
assert_eq!(level, BlockProtection::Full);
}
#[test]
fn set_block_protection_none() {
let mut eeprom = make_eeprom(MockSpi::new());
block_on(eeprom.set_block_protection(BlockProtection::None)).unwrap();
let level = block_on(eeprom.get_block_protection()).unwrap();
assert_eq!(level, BlockProtection::None);
}
#[test]
fn set_block_protection_upper_quarter() {
let mut eeprom = make_eeprom(MockSpi::new());
block_on(eeprom.set_block_protection(BlockProtection::UpperQuarter)).unwrap();
let level = block_on(eeprom.get_block_protection()).unwrap();
assert_eq!(level, BlockProtection::UpperQuarter);
}
#[test]
fn set_block_protection_upper_half() {
let mut eeprom = make_eeprom(MockSpi::new());
block_on(eeprom.set_block_protection(BlockProtection::UpperHalf)).unwrap();
let level = block_on(eeprom.get_block_protection()).unwrap();
assert_eq!(level, BlockProtection::UpperHalf);
}
#[test]
fn set_block_protection_full() {
let mut eeprom = make_eeprom(MockSpi::new());
block_on(eeprom.set_block_protection(BlockProtection::Full)).unwrap();
let level = block_on(eeprom.get_block_protection()).unwrap();
assert_eq!(level, BlockProtection::Full);
}
#[test]
fn clear_block_protection() {
let mut spi = MockSpi::new();
spi.status_register = 0x0C; let mut eeprom = make_eeprom(spi);
block_on(eeprom.clear_block_protection()).unwrap();
let level = block_on(eeprom.get_block_protection()).unwrap();
assert_eq!(level, BlockProtection::None);
}
#[test]
fn block_protection_roundtrip() {
let mut eeprom = make_eeprom(MockSpi::new());
let levels = [
BlockProtection::None,
BlockProtection::UpperQuarter,
BlockProtection::UpperHalf,
BlockProtection::Full,
BlockProtection::None,
];
for &level in &levels {
block_on(eeprom.set_block_protection(level)).unwrap();
let read_level = block_on(eeprom.get_block_protection()).unwrap();
assert_eq!(read_level, level);
}
}
}