avr-oxide 0.3.1

An extremely simple Rusty operating system for AVR microcontrollers
/* ringq.rs
 *
 * Developed by Tim Walls <tim.walls@snowgoons.com>
 * Copyright (c) All Rights Reserved, Tim Walls
 */
//! A simple ring queue implementation for internal use within Oxide.

// Imports ===================================================================
use core::mem::MaybeUninit;
use avr_oxide::concurrency::{Isolated, scheduler, thread};
use avr_oxide::concurrency::scheduler::ThreadState;
use avr_oxide::concurrency::util::ThreadSet;
use avr_oxide::halt_if_err;
use avr_oxide::util::datatypes::{BoundedIncDec, BoundedMaths, Volatile};

// Declarations ==============================================================

/**
 * A simple ring-queue implementation.  We make the maximum size 255 (i.e.
 * represented as a u8) so we can be sure that reading/writing that value
 * will be atomic without needing a lock.
 *
 * Note that in this implementation we can store 1-less than the size (we
 * need a buffer between start and end of queue so we can tell the difference
 * between full and empty.  An alternative is to maintain a separate length
 * counter, but then I have to make that atomic, whereas in this implementation
 * I can be sure tail is only ever written by the consumer, and head is only
 * ever written by the producer, and both are atomic.
 */
pub struct RingQ<E, const SIZE: usize>
  where
    E: Clone + Coalesce + Copy
{
  queue: [MaybeUninit<E>; SIZE],
  head: Volatile<u8>,
  tail: Volatile<u8>,
  blocked_consumers: ThreadSet,
  blocked_producers: ThreadSet,
}

/**
 * A trait implemented by objects that can be coalesced - i.e. two objects
 * replaced by one that represents the same semantic event/action/whatever.
 */
pub trait Coalesce : Sized {
  /**
   * Mutate myself so that I represent the result of coalescing with the
   * provided element.
   */
  fn coalesced(&self, with: &Self) -> Result<Self,QueueError>;
}

/**
 * Marker trait that allows a default implementation for Coalesce for types
 * that don't want the functionality.
 */
pub trait DoesNotCoalesce {}

/**
 * Errors which can be generated by our Ring Queue.
 */
pub enum QueueError {
  /// There is no more space in the queue to append an item
  QueueFull,

  /// An attempt to coalesce objects that cannot be combined was made
  CannotCoalesce
}


// Code ======================================================================
impl DoesNotCoalesce for u8 {}

impl<T: DoesNotCoalesce> Coalesce for T {
  fn coalesced(&self, _with: &Self) -> Result<Self, QueueError> {
    Err(QueueError::CannotCoalesce)
  }
}

impl<E, const SIZE: usize> Default for RingQ<E,SIZE>
where
  E: Clone + Coalesce + Copy
{
  fn default() -> Self {
    RingQ::<E,SIZE>::new()
  }
}

impl<E, const SIZE: usize> RingQ<E,SIZE>
where
  E: Clone + Coalesce + Copy
{
  pub fn new() -> Self {
    Self {
      queue: [MaybeUninit::uninit(); SIZE],
      head: 0u8.into(),
      tail: 0u8.into(),
      blocked_consumers: ThreadSet::new(),
      blocked_producers: ThreadSet::new(),
    }
  }

  pub fn new_with(event: E) -> Self {
    avr_oxide::concurrency::interrupt::isolated(|isotoken|{
      let mut new = Self::new();
      halt_if_err!(new.append(isotoken, event), avr_oxide::oserror::OsError::InternalError);
      new
    })
  }

  /**
   * Return the number of elements in the queue
   */
  #[allow(dead_code)]
  pub fn len(&self) -> u8 {
    self.head.bsub::<SIZE>(&self.tail)
  }

  /**
   * Consume an entry from the ring queue (if there is one :).)  This is
   * the single-consumer version of the method, that does no locking at all -
   * it will only be safe to use if no other thread is also reading.
   */
  pub fn consume(&mut self, isotoken: Isolated) -> Option<E> {
    let tail = self.tail.read();

    if self.head == tail {
      None
    } else {
      unsafe {
        // Extract the current value *before* we modify the tail pointer
        let consumed = self.queue[tail as usize].assume_init_read();

        self.tail.binc_isolated::<SIZE>(isotoken);

        // We can release any producers that were waiting for space
        scheduler::release_all_threads_and_clear(isotoken, &mut self.blocked_producers);

        Some(consumed)
      }
    }
  }

  /**
   * Consume an entry from the ring queue.  If there is nothing to consume,
   * block until there is.
   */
  pub fn consume_blocking(&mut self) -> E {
    loop {
      let element =
        avr_oxide::concurrency::interrupt::isolated(|isotoken|{
          match self.consume(isotoken) {
            None => {
              // If there was nothing to consume, then add myself to the blocked
              // list (we'll yield once interrupts are enabled again)
              self.blocked_consumers.add_current_thread(isotoken);
              scheduler::set_current_thread_state(isotoken, ThreadState::BlockedOnQueue);
              None
            },
            Some(e) => {
              // If we got something though, we can just return it
              Some(e)
            }
          }
        });

      match element {
        Some(value) => {
          return value;
        },
        None => {
          // Yield before trying to go round the thread again
          thread::yield_now();
        }
      }
    }
  }

  /**
   * Insert an entry into the ring queue, if possible.
   */
  pub fn append(&mut self, isotoken: Isolated, element: E) -> Result<(), QueueError> {
    let head = self.head.read();

    if head != self.tail.read() { // Queue is not empty
      let prev = head.bsub_isolated::<SIZE>(isotoken, 1);

      // OK, attempt coalescing
      unsafe {
        let queued_element = &self.queue[prev as usize].assume_init();

        match queued_element.coalesced(&element) {
          Ok(coalesced_element) => {
            *(self.queue[prev as usize].assume_init_mut()) = coalesced_element;
            return Ok(())
          },
          Err(_) => {
            // Fall through
          }
        }
      }
    }

    if head != (self.tail.bsub_isolated::<SIZE>(isotoken, 1)) { // There is space
      self.queue[head as usize].write(element);
      self.head.binc_isolated::<SIZE>(isotoken);

      // We can release any waiting consumers as well
      scheduler::release_all_threads_and_clear(isotoken, &mut self.blocked_consumers);
      Ok(())
    } else {
      Err(QueueError::QueueFull)
    }
  }

  /**
   * Insert an entry into the ring queue.  If the queue is full, block until
   * there is space.
   */
  pub fn append_blocking(&mut self, element: E) {
    loop {
      if avr_oxide::concurrency::interrupt::isolated(|isotoken|{
        if self.append(isotoken, element).is_ok() {
          true
        } else {
          // Add ourselves to the list of blocked producers
          self.blocked_producers.add_current_thread(isotoken);
          scheduler::set_current_thread_state(isotoken, ThreadState::BlockedOnQueue);
          false
        }
      }) {
        return;
      } else {
        // If we got here, then we are waiting - so yield
        thread::yield_now();
      }
    }
  }
}

// Tests =====================================================================
#[cfg(test)]
mod tests {
  use avr_oxide::private::ringq::{RingQ, DoesNotCoalesce, Coalesce, QueueError, BoundedMaths};

  #[derive(Clone,Copy,PartialEq,Eq,Debug)]
  struct TestEvent {
    num: u8
  }
  impl DoesNotCoalesce for TestEvent {}

  #[derive(Clone,Copy,PartialEq,Eq,Debug)]
  struct CoalescingTestEvent {
    num: u8
  }
  impl Coalesce for CoalescingTestEvent {
    fn coalesced(&self, with: &Self) -> Result<Self, QueueError> {
      println!("Self.num = {}, adding {}", self.num, with.num);
      Ok(Self {
        num: self.num + with.num
      })
    }
  }



  #[test]
  fn test_ringq() {
    avr_oxide::concurrency::interrupt::isolated(|isotoken|{
      let mut queue : RingQ<TestEvent,10> = RingQ::new();

      println!("Initial queue length: {}", queue.len());
      assert_eq!(queue.len(), 0);

      let test_ev1 = TestEvent { num: 0 };
      queue.append(isotoken, test_ev1);

      println!("Queue length after adding 1 event: {}", queue.len());
      assert_eq!(queue.len(), 1);
    });
  }

  #[test]
  #[should_panic]
  fn test_ringq_bounds() {
    avr_oxide::concurrency::interrupt::isolated(|isotoken|{
      let mut queue: RingQ<TestEvent,10> = RingQ::new();

      for i in 1..=9 {
        let test_ev1 = TestEvent { num: 0 };
        queue.append(isotoken, test_ev1);
        println!("Appended element {}", i);
      }

      println!("Queue length: {}", queue.len());
      println!("Succesfully added 9 events; next should panic");
      let test_ev1 = TestEvent { num: 0 };

      if let Ok(_) = queue.append(isotoken,test_ev1) {
      } else {
        panic!();
      }
    })
  }

  #[test]
  fn test_ringq_consume() {
    avr_oxide::concurrency::interrupt::isolated(|isotoken|{
      let mut queue : RingQ<TestEvent,4> = RingQ::new();

      println!("Initial queue length: {}", queue.len());
      assert_eq!(queue.len(), 0);

      let test_ev1 = TestEvent { num: 1 };
      let test_ev2 = TestEvent { num: 2 };
      let test_ev3 = TestEvent { num: 3 };
      queue.append(isotoken, test_ev1.clone());
      queue.append(isotoken, test_ev2.clone());
      queue.append(isotoken, test_ev3.clone());

      println!("Queue length after adding 3 events: {}", queue.len());
      assert_eq!(queue.len(), 3);

      let consumed_ev1 = queue.consume(isotoken).unwrap();
      let consumed_ev2 = queue.consume(isotoken).unwrap();

      println!("Queue length after consuming 2 events: {}", queue.len());
      assert_eq!(queue.len(), 1);

      let test_ev4 = TestEvent { num: 4 };
      let test_ev5 = TestEvent { num: 5 };
      queue.append(isotoken, test_ev4.clone());
      queue.append(isotoken, test_ev5.clone());

      println!("Queue length after adding 2 more events: {}", queue.len());
      assert_eq!(queue.len(), 3);

      let consumed_ev3 = queue.consume(isotoken).unwrap();
      let consumed_ev4 = queue.consume(isotoken).unwrap();
      let consumed_ev5 = queue.consume(isotoken).unwrap();

      println!("Queue length after consuming 3 more events: {}", queue.len());
      assert_eq!(queue.len(), 0);

      // Check we got the data out in the same order we inserted it
      let inserted = [ test_ev1, test_ev2, test_ev3, test_ev4, test_ev5 ];
      let consumed = [ consumed_ev1, consumed_ev2, consumed_ev3, consumed_ev4, consumed_ev5 ];
      assert_eq!(inserted, consumed);
    })
  }

  #[test]
  fn test_ringq_coalesce() {
    avr_oxide::concurrency::interrupt::isolated(|isotoken|{
      let mut queue : RingQ<CoalescingTestEvent, 4> = RingQ::new();

      let test_ev1 = CoalescingTestEvent { num: 1 };
      let test_ev2 = CoalescingTestEvent { num: 2 };
      let test_ev3 = CoalescingTestEvent { num: 3 };
      let test_ev4 = CoalescingTestEvent { num: 4 };
      let test_ev5 = CoalescingTestEvent { num: 5 };

      println!("Initial queue length: {}", queue.len());
      assert_eq!(queue.len(), 0);

      queue.append(isotoken, test_ev1.clone());
      queue.append(isotoken, test_ev2.clone());
      queue.append(isotoken, test_ev3.clone());
      queue.append(isotoken, test_ev4.clone());
      queue.append(isotoken, test_ev5.clone());

      println!("Queue length after adding 5 coalescable events: {}", queue.len());
      assert_eq!(queue.len(), 1);

      let consumed_ev1 = queue.consume(isotoken).unwrap();
      println!("Consumed event: {:?}", &consumed_ev1);
      assert_eq!(consumed_ev1.num, 15);
    });
  }


}