use postcard_schema::Schema;
use probe_rs::rtt::{self, DownChannel, Error, Rtt, UpChannel};
use probe_rs::{Core, MemoryInterface};
use serde::{Deserialize, Serialize};
pub(crate) mod client;
pub(crate) mod processing;
pub use processing::*;
fn default_show_timestamps() -> bool {
true
}
#[derive(
Debug, Copy, Clone, PartialEq, Eq, Default, docsplay::Display, Serialize, Deserialize, Schema,
)]
pub enum DataFormat {
#[default]
String,
BinaryLE,
Defmt,
}
#[derive(Clone, Copy, Eq, PartialEq, Debug, Serialize, Deserialize, Schema)]
#[repr(u32)]
pub enum ChannelMode {
NoBlockSkip = 0,
NoBlockTrim = 1,
BlockIfFull = 2,
}
impl From<ChannelMode> for rtt::ChannelMode {
fn from(mode: ChannelMode) -> Self {
match mode {
ChannelMode::NoBlockSkip => rtt::ChannelMode::NoBlockSkip,
ChannelMode::NoBlockTrim => rtt::ChannelMode::NoBlockTrim,
ChannelMode::BlockIfFull => rtt::ChannelMode::BlockIfFull,
}
}
}
impl From<rtt::ChannelMode> for ChannelMode {
fn from(mode: rtt::ChannelMode) -> Self {
match mode {
rtt::ChannelMode::NoBlockSkip => ChannelMode::NoBlockSkip,
rtt::ChannelMode::NoBlockTrim => ChannelMode::NoBlockTrim,
rtt::ChannelMode::BlockIfFull => ChannelMode::BlockIfFull,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default, Schema)]
pub struct RttConfig {
#[serde(default, rename = "rttEnabled")]
pub enabled: bool,
#[serde(default = "Vec::new", rename = "rttChannelFormats")]
pub channels: Vec<RttChannelConfig>,
}
impl RttConfig {
pub fn channel_config(&self, channel_number: u32) -> Option<&RttChannelConfig> {
self.channels
.iter()
.find(|ch| ch.channel_number == Some(channel_number))
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Schema)]
#[serde(rename_all = "camelCase")]
pub struct RttChannelConfig {
pub channel_number: Option<u32>,
#[serde(default)]
pub data_format: DataFormat,
#[serde(default)]
pub mode: Option<ChannelMode>,
#[serde(default = "default_show_timestamps")]
pub show_timestamps: bool,
#[serde(default)]
pub show_location: bool,
#[serde(default)]
pub log_format: Option<String>,
}
impl Default for RttChannelConfig {
fn default() -> Self {
RttChannelConfig {
channel_number: Default::default(),
data_format: Default::default(),
mode: Default::default(),
show_timestamps: default_show_timestamps(),
show_location: Default::default(),
log_format: Default::default(),
}
}
}
#[derive(Debug)]
pub struct RttActiveUpChannel {
pub up_channel: UpChannel,
rtt_buffer: Box<[u8]>,
bytes_buffered: usize,
original_mode: Option<rtt::ChannelMode>,
}
impl RttActiveUpChannel {
pub fn new(up_channel: UpChannel) -> Self {
Self {
rtt_buffer: vec![0; up_channel.buffer_size().max(1)].into_boxed_slice(),
bytes_buffered: 0,
up_channel,
original_mode: None,
}
}
pub fn change_mode(&mut self, core: &mut Core, mode: ChannelMode) -> Result<(), Error> {
if self.original_mode.is_none() {
self.original_mode = Some(self.up_channel.mode(core)?);
}
self.up_channel.set_mode(core, mode.into())
}
pub fn channel_name(&self) -> String {
self.up_channel
.name()
.map(ToString::to_string)
.unwrap_or_else(|| format!("Unnamed RTT up channel - {}", self.up_channel.number()))
}
pub fn number(&self) -> u32 {
self.up_channel.number() as u32
}
pub fn poll(&mut self, core: &mut Core) -> Result<(), Error> {
self.bytes_buffered = self.up_channel.read(core, self.rtt_buffer.as_mut())?;
Ok(())
}
pub fn buffered_data(&self) -> &[u8] {
&self.rtt_buffer[..self.bytes_buffered]
}
pub fn clean_up(&mut self, core: &mut Core) -> Result<(), Error> {
if let Some(mode) = self.original_mode.take() {
self.up_channel.set_mode(core, mode)?;
}
Ok(())
}
}
#[derive(Debug)]
pub struct RttActiveDownChannel {
pub down_channel: DownChannel,
}
impl RttActiveDownChannel {
pub fn new(down_channel: DownChannel) -> Self {
Self { down_channel }
}
pub fn channel_name(&self) -> String {
self.down_channel
.name()
.map(ToString::to_string)
.unwrap_or_else(|| format!("Unnamed RTT down channel - {}", self.down_channel.number()))
}
pub fn number(&self) -> u32 {
self.down_channel.number() as u32
}
pub fn write(&mut self, core: &mut Core<'_>, data: impl AsRef<[u8]>) -> Result<(), Error> {
self.down_channel.write(core, data.as_ref()).map(|_| ())
}
}
#[derive(Debug, thiserror::Error, docsplay::Display)]
pub enum RttSymbolError {
RttSymbolNotFound,
Goblin(#[source] goblin::error::Error),
}
#[derive(Debug)]
pub struct RttConnection {
control_block_addr: u64,
pub active_up_channels: Vec<RttActiveUpChannel>,
pub active_down_channels: Vec<RttActiveDownChannel>,
}
impl RttConnection {
pub fn new(rtt: Rtt) -> Result<Self, Error> {
let control_block_addr = rtt.ptr();
let active_up_channels = rtt
.up_channels
.into_iter()
.map(RttActiveUpChannel::new)
.collect::<Vec<_>>();
let active_down_channels = rtt
.down_channels
.into_iter()
.map(RttActiveDownChannel::new)
.collect::<Vec<_>>();
Ok(Self {
control_block_addr,
active_up_channels,
active_down_channels,
})
}
pub fn poll_channel(&mut self, core: &mut Core, channel_idx: u32) -> Result<(), Error> {
let channel_idx = channel_idx as usize;
if let Some(channel) = self.active_up_channels.get_mut(channel_idx) {
channel.poll(core)
} else {
Err(Error::MissingChannel(channel_idx))
}
}
pub fn channel_data(&self, channel_idx: u32) -> Result<&[u8], Error> {
let channel_idx = channel_idx as usize;
if let Some(channel) = self.active_up_channels.get(channel_idx) {
Ok(channel.buffered_data())
} else {
Err(Error::MissingChannel(channel_idx))
}
}
pub fn write_down_channel(
&mut self,
core: &mut Core,
channel_idx: u32,
data: impl AsRef<[u8]>,
) -> Result<(), Error> {
let channel_idx = channel_idx as usize;
if let Some(channel) = self.active_down_channels.get_mut(channel_idx) {
channel.write(core, data)
} else {
Err(Error::MissingChannel(channel_idx))
}
}
pub fn clean_up(&mut self, core: &mut Core) -> Result<(), Error> {
for channel in self.active_up_channels.iter_mut() {
channel.clean_up(core)?;
}
Ok(())
}
pub fn clear_control_block(&mut self, core: &mut Core) -> Result<(), Error> {
let zeros = vec![0; Rtt::control_block_size()];
core.write(self.control_block_addr, &zeros)?;
self.active_down_channels.clear();
self.active_up_channels.clear();
Ok(())
}
}
pub fn get_rtt_symbol_from_bytes(buffer: &[u8]) -> Result<u64, RttSymbolError> {
match goblin::elf::Elf::parse(buffer) {
Ok(binary) => {
for sym in &binary.syms {
if binary.strtab.get_at(sym.st_name) == Some("_SEGGER_RTT") {
return Ok(sym.st_value);
}
}
Err(RttSymbolError::RttSymbolNotFound)
}
Err(err) => Err(RttSymbolError::Goblin(err)),
}
}