cfg_if::cfg_if! {
if #[cfg(feature = "annotation")] {
use crate::runtime::{
execution::ExecutionState,
task::{clock::VectorClock, Task, TaskId},
};
use serde::Serialize;
use std::cell::RefCell;
use std::collections::HashMap;
use std::thread_local;
thread_local! {
static ANNOTATION_STATE: RefCell<Option<AnnotationState>> = RefCell::new(None);
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)]
pub(crate) struct ObjectId(usize);
pub(crate) const DUMMY_OBJECT_ID: ObjectId = ObjectId(usize::MAX);
pub(crate) const ANNOTATION_VERSION: usize = 0;
#[derive(Serialize)]
struct FileInfo {
path: String,
}
#[derive(Serialize)]
struct FunctionInfo {
name: String,
}
#[derive(Serialize)]
struct Frame(
)
usize,
usize,
usize,
usize,
);
#[derive(Serialize)]
struct ObjectInfo {
created_by: TaskId,
created_at: usize,
name: Option<String>,
kind: Option<String>,
}
#[derive(Serialize)]
struct TaskInfo {
created_by: TaskId,
first_step: usize,
last_step: usize,
name: Option<String>,
}
#[derive(Debug, Serialize)]
enum AnnotationEvent {
SemaphoreCreated(ObjectId),
SemaphoreClosed(ObjectId),
SemaphoreAcquireFast(ObjectId, usize),
SemaphoreAcquireBlocked(ObjectId, usize),
SemaphoreAcquireUnblocked(ObjectId, TaskId, usize),
SemaphoreTryAcquire(ObjectId, usize, bool),
SemaphoreRelease(ObjectId, usize),
TaskCreated(TaskId, bool),
TaskTerminated,
Random,
Tick,
}
#[derive(Serialize)]
struct EventInfo(
TaskId,
Option<Vec<Frame>>,
AnnotationEvent,
) clock of the task
Option<VectorClock>,
Option<Vec<TaskId>>,
);
#[derive(Default, Serialize)]
struct AnnotationState {
version: usize,
files: Vec<FileInfo>,
#[serde(skip)]
path_to_file: HashMap<String, usize>,
functions: Vec<FunctionInfo>,
#[serde(skip)]
name_to_function: HashMap<String, usize>,
objects: Vec<ObjectInfo>,
tasks: Vec<TaskInfo>,
events: Vec<EventInfo>,
#[serde(skip)]
last_runnable_ids: Option<Vec<TaskId>>,
#[serde(skip)]
last_task_id: Option<TaskId>,
#[serde(skip)]
max_task_id: Option<TaskId>,
}
impl Serialize for VectorClock {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
use serde::ser::SerializeSeq;
let mut seq = serializer.serialize_seq(Some(self.time.len()))?;
for e in &self.time {
seq.serialize_element(e)?;
}
seq.end()
}
}
impl Serialize for TaskId {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
usize::from(*self).serialize(serializer)
}
}
fn record_event(event: AnnotationEvent) {
with_state(move |state| {
let task_id = state.last_task_id.expect("no last task ID");
let task_id_num = usize::from(task_id);
assert!(task_id_num < state.tasks.len());
state.tasks[task_id_num].first_step = state.tasks[task_id_num].first_step.min(state.events.len());
state.tasks[task_id_num].last_step = state.tasks[task_id_num].last_step.max(state.events.len());
use std::backtrace::{Backtrace, BacktraceStatus};
use std::sync::OnceLock;
use regex::Regex;
static RE: OnceLock<Regex> = OnceLock::new();
let regex = RE.get_or_init(|| Regex::new(r"([0-9]+): ([^\n]+)\n +at (\./src/[^:]+):([0-9]+):([0-9]+)\b").unwrap());
let bt = Backtrace::capture();
let info = if bt.status() == BacktraceStatus::Captured {
Some(regex
.captures_iter(&format!("{bt}"))
.map(|group| group.extract().1)
.map(|[_num, function_name, path, line_str, col_str]| {
let path_idx = *state
.path_to_file
.entry(path.to_string())
.or_insert_with(|| {
let idx = state.files.len();
state.files.push(FileInfo {
path: path.to_string(),
});
idx
});
let function_idx = *state
.name_to_function
.entry(function_name.to_string())
.or_insert_with(|| {
let idx = state.functions.len();
state.functions.push(FunctionInfo {
name: function_name.to_string(),
});
idx
});
Frame(
path_idx, function_idx, line_str.parse::<usize>().unwrap(), col_str.parse::<usize>().unwrap(), )
})
.collect::<Vec<_>>())
} else {
None
};
state.events.push(EventInfo(
task_id,
info,
event,
ExecutionState::try_with(|state| state.get_clock(task_id).clone()),
state.last_runnable_ids.take(),
))
});
}
fn with_state<R, F: FnOnce(&mut AnnotationState) -> R>(f: F) -> Option<R> {
ANNOTATION_STATE.with(|cell| {
let mut bw = cell.borrow_mut();
let state = bw.as_mut()?;
Some(f(state))
})
}
fn record_object() -> ObjectId {
with_state(|state| {
let id = ObjectId(state.objects.len());
state.objects.push(ObjectInfo {
created_by: state.last_task_id.unwrap(),
created_at: state.events.len(),
name: None,
kind: None,
});
id
})
.unwrap_or(DUMMY_OBJECT_ID)
}
pub(crate) fn start_annotations() {
ANNOTATION_STATE.with(|cell| {
let mut bw = cell.borrow_mut();
assert!(bw.is_none(), "annotations already started");
let mut state: AnnotationState = Default::default();
state.version = ANNOTATION_VERSION;
state.last_task_id = Some(0.into());
*bw = Some(state);
});
}
pub(crate) fn stop_annotations() {
ANNOTATION_STATE.with(|cell| {
let mut bw = cell.borrow_mut();
let state = bw.take().expect("annotations not started");
if state.max_task_id.is_none() {
return;
};
let json = serde_json::to_string(&state).unwrap();
std::fs::write(
std::env::var("SHUTTLE_ANNOTATION_FILE").unwrap_or_else(|_| "annotated.json".to_string()),
json,
)
.unwrap();
});
}
pub(crate) fn record_semaphore_created() -> ObjectId {
let object_id = record_object();
record_event(AnnotationEvent::SemaphoreCreated(object_id));
object_id
}
pub(crate) fn record_semaphore_closed(object_id: ObjectId) {
record_event(AnnotationEvent::SemaphoreClosed(object_id));
}
pub(crate) fn record_semaphore_acquire_fast(object_id: ObjectId, num_permits: usize) {
record_event(AnnotationEvent::SemaphoreAcquireFast(object_id, num_permits));
}
pub(crate) fn record_semaphore_acquire_blocked(object_id: ObjectId, num_permits: usize) {
record_event(AnnotationEvent::SemaphoreAcquireBlocked(object_id, num_permits));
}
pub(crate) fn record_semaphore_acquire_unblocked(object_id: ObjectId, unblocked_task_id: TaskId, num_permits: usize) {
record_event(AnnotationEvent::SemaphoreAcquireUnblocked(
object_id,
unblocked_task_id,
num_permits,
));
}
pub(crate) fn record_semaphore_try_acquire(object_id: ObjectId, num_permits: usize, successful: bool) {
record_event(AnnotationEvent::SemaphoreTryAcquire(object_id, num_permits, successful));
}
pub(crate) fn record_semaphore_release(object_id: ObjectId, num_permits: usize) {
record_event(AnnotationEvent::SemaphoreRelease(object_id, num_permits));
}
pub(crate) fn record_task_created(task_id: TaskId, is_future: bool) {
with_state(move |state| {
assert_eq!(state.tasks.len(), usize::from(task_id));
state.tasks.push(TaskInfo {
created_by: state.last_task_id.unwrap(),
first_step: usize::MAX,
last_step: 0,
name: None,
});
});
record_event(AnnotationEvent::TaskCreated(task_id, is_future));
}
pub(crate) fn record_task_terminated() {
record_event(AnnotationEvent::TaskTerminated);
}
pub(crate) fn record_name_for_object(object_id: ObjectId, name: Option<&str>, kind: Option<&str>) {
with_state(move |state| {
if let Some(object_info) = state.objects.get_mut(object_id.0) {
if name.is_some() {
object_info.name = name.map(|name| name.to_string());
}
if kind.is_some() {
object_info.kind = kind.map(|kind| kind.to_string());
}
} });
}
pub(crate) fn record_name_for_task(task_id: TaskId, name: &crate::current::TaskName) {
with_state(|state| {
if let Some(task_info) = state.tasks.get_mut(usize::from(task_id)) {
let name: &String = name.into();
task_info.name = Some(name.to_string());
} });
}
pub(crate) fn record_random() {
record_event(AnnotationEvent::Random);
}
pub(crate) fn record_schedule(choice: TaskId, runnable_tasks: &[&Task]) {
with_state(|state| {
let choice_id_num = usize::from(choice);
state.tasks[choice_id_num].first_step = state.tasks[choice_id_num].first_step.min(state.events.len());
state.tasks[choice_id_num].last_step = state.tasks[choice_id_num].last_step.max(state.events.len());
assert!(
state.last_runnable_ids.is_none(),
"multiple schedule calls without a Tick"
);
state.last_runnable_ids = Some(runnable_tasks.iter().map(|task| task.id()).collect::<Vec<_>>());
state.last_task_id = Some(choice);
state.max_task_id = state.max_task_id.max(Some(choice));
});
}
pub(crate) fn record_tick() {
record_event(AnnotationEvent::Tick);
}
} else {
use crate::runtime::task::{Task, TaskId};
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) struct ObjectId;
pub(crate) const DUMMY_OBJECT_ID: ObjectId = ObjectId;
#[inline(always)]
pub(crate) fn start_annotations() {}
#[inline(always)]
pub(crate) fn stop_annotations() {}
#[inline(always)]
pub(crate) fn record_semaphore_created() -> ObjectId {
DUMMY_OBJECT_ID
}
#[inline(always)]
pub(crate) fn record_semaphore_closed(_object_id: ObjectId) {}
#[inline(always)]
pub(crate) fn record_semaphore_acquire_fast(_object_id: ObjectId, _num_permits: usize) {}
#[inline(always)]
pub(crate) fn record_semaphore_acquire_blocked(_object_id: ObjectId, _num_permits: usize) {}
#[inline(always)]
pub(crate) fn record_semaphore_acquire_unblocked(_object_id: ObjectId, _unblocked_task_id: TaskId, _num_permits: usize) {}
#[inline(always)]
pub(crate) fn record_semaphore_try_acquire(_object_id: ObjectId, _num_permits: usize, _successful: bool) {}
#[inline(always)]
pub(crate) fn record_semaphore_release(_object_id: ObjectId, _num_permits: usize) {}
#[inline(always)]
pub(crate) fn record_task_created(_task_id: TaskId, _future: bool) {}
#[inline(always)]
pub(crate) fn record_task_terminated() {}
#[inline(always)]
pub(crate) fn record_name_for_object(_object_id: ObjectId, _name: Option<&str>, _kind: Option<&str>) {}
#[inline(always)]
pub(crate) fn record_name_for_task(_task_id: TaskId, _name: &crate::current::TaskName) {}
#[inline(always)]
pub(crate) fn record_random() {}
#[inline(always)]
pub(crate) fn record_schedule(_choice: TaskId, _runnable_tasks: &[&Task]) {}
#[inline(always)]
pub(crate) fn record_tick() {}
}
}
pub trait WithName {
fn with_name_and_kind(self, name: Option<&str>, kind: Option<&str>) -> Self;
fn with_name(self, name: &str) -> Self
where
Self: Sized,
{
self.with_name_and_kind(Some(name), None)
}
fn with_kind(self, kind: &str) -> Self
where
Self: Sized,
{
self.with_name_and_kind(None, Some(kind))
}
}