#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "EventDirection")]
pub struct PyEventDirection {
pub(crate) direction: events::EventDirection,
}
#[pymethods]
impl PyEventDirection {
#[classattr]
#[pyo3(name = "INCREASING")]
fn increasing() -> Self {
PyEventDirection {
direction: events::EventDirection::Increasing,
}
}
#[classattr]
#[pyo3(name = "DECREASING")]
fn decreasing() -> Self {
PyEventDirection {
direction: events::EventDirection::Decreasing,
}
}
#[classattr]
#[pyo3(name = "ANY")]
fn any() -> Self {
PyEventDirection {
direction: events::EventDirection::Any,
}
}
fn __richcmp__(&self, other: &Self, op: CompareOp) -> PyResult<bool> {
match op {
CompareOp::Eq => Ok(self.direction == other.direction),
CompareOp::Ne => Ok(self.direction != other.direction),
_ => Err(exceptions::PyNotImplementedError::new_err(
"Comparison not supported",
)),
}
}
fn __str__(&self) -> String {
format!("{:?}", self.direction)
}
fn __repr__(&self) -> String {
format!("EventDirection.{:?}", self.direction)
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "EdgeType")]
pub struct PyEdgeType {
pub(crate) edge: events::EdgeType,
}
#[pymethods]
impl PyEdgeType {
#[classattr]
#[pyo3(name = "RISING_EDGE")]
fn rising_edge() -> Self {
PyEdgeType {
edge: events::EdgeType::RisingEdge,
}
}
#[classattr]
#[pyo3(name = "FALLING_EDGE")]
fn falling_edge() -> Self {
PyEdgeType {
edge: events::EdgeType::FallingEdge,
}
}
#[classattr]
#[pyo3(name = "ANY_EDGE")]
fn any_edge() -> Self {
PyEdgeType {
edge: events::EdgeType::AnyEdge,
}
}
fn __richcmp__(&self, other: &Self, op: CompareOp) -> PyResult<bool> {
match op {
CompareOp::Eq => Ok(self.edge == other.edge),
CompareOp::Ne => Ok(self.edge != other.edge),
_ => Err(exceptions::PyNotImplementedError::new_err(
"Comparison not supported",
)),
}
}
fn __str__(&self) -> String {
format!("{:?}", self.edge)
}
fn __repr__(&self) -> String {
format!("EdgeType.{:?}", self.edge)
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "EventAction")]
pub struct PyEventAction {
pub(crate) action: events::EventAction,
}
#[pymethods]
impl PyEventAction {
#[classattr]
#[pyo3(name = "STOP")]
fn stop() -> Self {
PyEventAction {
action: events::EventAction::Stop,
}
}
#[classattr]
#[pyo3(name = "CONTINUE")]
fn continue_() -> Self {
PyEventAction {
action: events::EventAction::Continue,
}
}
fn __richcmp__(&self, other: &Self, op: CompareOp) -> PyResult<bool> {
match op {
CompareOp::Eq => Ok(self.action == other.action),
CompareOp::Ne => Ok(self.action != other.action),
_ => Err(exceptions::PyNotImplementedError::new_err(
"Comparison not supported",
)),
}
}
fn __str__(&self) -> String {
format!("{:?}", self.action)
}
fn __repr__(&self) -> String {
format!("EventAction.{:?}", self.action)
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "EventType")]
pub struct PyEventType {
pub(crate) event_type: events::EventType,
}
#[pymethods]
impl PyEventType {
#[classattr]
#[pyo3(name = "INSTANTANEOUS")]
fn instantaneous() -> Self {
PyEventType {
event_type: events::EventType::Instantaneous,
}
}
#[classattr]
#[pyo3(name = "WINDOW")]
fn period() -> Self {
PyEventType {
event_type: events::EventType::Window,
}
}
fn __richcmp__(&self, other: &Self, op: CompareOp) -> PyResult<bool> {
match op {
CompareOp::Eq => Ok(self.event_type == other.event_type),
CompareOp::Ne => Ok(self.event_type != other.event_type),
_ => Err(exceptions::PyNotImplementedError::new_err(
"Comparison not supported",
)),
}
}
fn __str__(&self) -> String {
format!("{:?}", self.event_type)
}
fn __repr__(&self) -> String {
format!("EventType.{:?}", self.event_type)
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "DetectedEvent")]
pub struct PyDetectedEvent {
pub(crate) event: events::DDetectedEvent,
}
#[pymethods]
impl PyDetectedEvent {
#[getter]
fn window_open(&self) -> PyEpoch {
PyEpoch {
obj: self.event.window_open,
}
}
#[getter]
fn window_close(&self) -> PyEpoch {
PyEpoch {
obj: self.event.window_close,
}
}
#[getter]
fn entry_state<'py>(&self, py: Python<'py>) -> Bound<'py, PyArray<f64, Ix1>> {
self.event.entry_state.as_slice().to_pyarray(py).to_owned()
}
#[getter]
fn exit_state<'py>(&self, py: Python<'py>) -> Bound<'py, PyArray<f64, Ix1>> {
self.event.exit_state.as_slice().to_pyarray(py).to_owned()
}
#[getter]
fn value(&self) -> f64 {
self.event.value
}
#[getter]
fn name(&self) -> String {
self.event.name.clone()
}
#[getter]
fn action(&self) -> PyEventAction {
PyEventAction {
action: self.event.action,
}
}
#[getter]
fn event_type(&self) -> PyEventType {
PyEventType {
event_type: self.event.event_type,
}
}
fn t_start(&self) -> PyEpoch {
PyEpoch {
obj: self.event.t_start(),
}
}
fn t_end(&self) -> PyEpoch {
PyEpoch {
obj: self.event.t_end(),
}
}
fn start_time(&self) -> PyEpoch {
PyEpoch {
obj: self.event.start_time(),
}
}
fn end_time(&self) -> PyEpoch {
PyEpoch {
obj: self.event.end_time(),
}
}
fn __str__(&self) -> String {
format!(
"DetectedEvent('{}', window: [{}, {}])",
self.event.name, self.event.window_open, self.event.window_close
)
}
fn __repr__(&self) -> String {
format!(
"DetectedEvent(name='{}', window_open={}, window_close={}, event_type={:?})",
self.event.name, self.event.window_open, self.event.window_close, self.event.event_type
)
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "TimeEvent")]
pub struct PyTimeEvent {
event: Option<events::DTimeEvent>,
target_time: time::Epoch,
base_name: String,
instance: Option<usize>,
is_terminal: bool,
time_tol: f64,
step_reduction_factor: f64,
py_callback: Option<Py<PyAny>>,
}
#[pymethods]
impl PyTimeEvent {
#[new]
#[pyo3(signature = (target_epoch, name))]
fn new(target_epoch: PyRef<PyEpoch>, name: String) -> PyResult<Self> {
let event = events::DTimeEvent::new(target_epoch.obj, name.clone());
Ok(PyTimeEvent {
event: Some(event),
target_time: target_epoch.obj,
base_name: name,
instance: None,
is_terminal: false,
time_tol: 1e-6,
step_reduction_factor: 0.2,
py_callback: None,
})
}
fn with_instance(mut slf: PyRefMut<'_, Self>, instance: usize) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_instance(instance));
}
Self {
event: slf.event.take(),
target_time: slf.target_time,
base_name: slf.base_name.clone(),
instance: Some(instance),
is_terminal: slf.is_terminal,
time_tol: slf.time_tol,
step_reduction_factor: slf.step_reduction_factor,
py_callback: slf.py_callback.take(),
}
}
#[allow(deprecated)]
fn with_callback(mut slf: PyRefMut<'_, Self>, callback: Py<PyAny>) -> Self {
let callback_clone = callback.clone_ref(slf.py());
let rust_callback = Box::new(
move |t: time::Epoch,
state: &DVector<f64>,
_params: Option<&DVector<f64>>|
-> (Option<DVector<f64>>, Option<DVector<f64>>, events::EventAction) {
Python::attach(|py| {
let py_epoch = Py::new(py, PyEpoch { obj: t }).ok();
let state_array = state.as_slice().to_pyarray(py).to_owned();
if py_epoch.is_none() {
return (None, None, events::EventAction::Continue);
}
let result = callback_clone.bind(py).call1((py_epoch.unwrap(), state_array));
match result {
Ok(tuple) => {
let item0 = tuple.get_item(0).ok();
let new_state: Option<DVector<f64>> = item0.and_then(|item| {
if item.is_none() {
None
} else {
pyany_to_f64_array1(&item, None)
.ok()
.map(DVector::from_vec)
}
});
let item1 = tuple.get_item(1).ok();
let action: events::EventAction = item1
.and_then(|item| item.extract::<PyRef<PyEventAction>>().ok())
.map(|a| a.action)
.unwrap_or(events::EventAction::Continue);
(new_state, None, action)
}
Err(e) => {
eprintln!("Warning: callback failed: {}", e);
(None, None, events::EventAction::Continue)
}
}
})
},
) as events::DEventCallback;
let py_callback_stored = callback.clone_ref(slf.py());
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_callback(rust_callback));
}
Self {
event: slf.event.take(),
target_time: slf.target_time,
base_name: slf.base_name.clone(),
instance: slf.instance,
is_terminal: slf.is_terminal,
time_tol: slf.time_tol,
step_reduction_factor: slf.step_reduction_factor,
py_callback: Some(py_callback_stored),
}
}
fn set_terminal(mut slf: PyRefMut<'_, Self>) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.set_terminal());
}
Self {
event: slf.event.take(),
target_time: slf.target_time,
base_name: slf.base_name.clone(),
instance: slf.instance,
is_terminal: true,
time_tol: slf.time_tol,
step_reduction_factor: slf.step_reduction_factor,
py_callback: slf.py_callback.take(),
}
}
fn with_time_tolerance(mut slf: PyRefMut<'_, Self>, time_tol: f64) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_time_tolerance(time_tol));
}
Self {
event: slf.event.take(),
target_time: slf.target_time,
base_name: slf.base_name.clone(),
instance: slf.instance,
is_terminal: slf.is_terminal,
time_tol,
step_reduction_factor: slf.step_reduction_factor,
py_callback: slf.py_callback.take(),
}
}
}
impl PyTimeEvent {
pub fn to_s_event(&self) -> events::STimeEvent<6, 0> {
let mut event = events::STimeEvent::<6, 0>::new(self.target_time, self.base_name.clone())
.with_time_tolerance(self.time_tol)
.with_step_reduction_factor(self.step_reduction_factor);
if let Some(instance) = self.instance {
event = event.with_instance(instance);
}
if self.is_terminal {
event = event.set_terminal();
}
event
}
pub fn get_py_callback(&self) -> Option<&Py<PyAny>> {
self.py_callback.as_ref()
}
pub fn is_consumed(&self) -> bool {
self.event.is_none()
}
pub fn take_d_event(&mut self) -> Option<events::DTimeEvent> {
self.event.take()
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "ValueEvent")]
pub struct PyValueEvent {
event: Option<events::DValueEvent>,
value_fn_py: Option<Py<PyAny>>,
base_name: String,
target_value: f64,
direction: events::EventDirection,
instance: Option<usize>,
is_terminal: bool,
time_tol: f64,
value_tol: f64,
py_callback: Option<Py<PyAny>>,
}
#[pymethods]
impl PyValueEvent {
#[new]
#[pyo3(signature = (name, value_fn, target_value, direction))]
#[allow(deprecated)]
fn new(
py: Python<'_>,
name: String,
value_fn: Py<PyAny>,
target_value: f64,
direction: PyRef<PyEventDirection>,
) -> PyResult<Self> {
let value_fn_clone = value_fn.clone_ref(py);
let rust_value_fn =
move |t: time::Epoch, state: &DVector<f64>, _params: Option<&DVector<f64>>| -> f64 {
Python::attach(|py| {
let py_epoch = match Py::new(py, PyEpoch { obj: t }) {
Ok(e) => e,
Err(e) => {
eprintln!("Warning: Failed to create PyEpoch: {}", e);
return 0.0; }
};
let state_array = state.as_slice().to_pyarray(py).to_owned();
let result: PyResult<Bound<'_, pyo3::PyAny>> = value_fn_clone
.bind(py)
.call1((py_epoch, state_array));
match result {
Ok(val) => val.extract::<f64>().unwrap_or_else(|e| {
eprintln!("Warning: value_fn must return float: {}", e);
0.0
}),
Err(e) => {
eprintln!("Warning: value_fn call failed: {}", e);
0.0
}
}
})
};
let event = events::DValueEvent::new(name.clone(), rust_value_fn, target_value, direction.direction);
Ok(PyValueEvent {
event: Some(event),
value_fn_py: Some(value_fn),
base_name: name,
target_value,
direction: direction.direction,
instance: None,
is_terminal: false,
time_tol: 1e-6,
value_tol: 1e-9,
py_callback: None,
})
}
fn with_instance(mut slf: PyRefMut<'_, Self>, instance: usize) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_instance(instance));
}
Self {
event: slf.event.take(),
value_fn_py: slf.value_fn_py.as_ref().map(|py_obj| py_obj.clone_ref(slf.py())),
base_name: slf.base_name.clone(),
target_value: slf.target_value,
direction: slf.direction,
instance: Some(instance),
is_terminal: slf.is_terminal,
time_tol: slf.time_tol,
value_tol: slf.value_tol,
py_callback: slf.py_callback.as_ref().map(|py_obj| py_obj.clone_ref(slf.py())),
}
}
fn with_tolerances(mut slf: PyRefMut<'_, Self>, time_tol: f64, value_tol: f64) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_tolerances(time_tol, value_tol));
}
Self {
event: slf.event.take(),
value_fn_py: slf.value_fn_py.as_ref().map(|py_obj| py_obj.clone_ref(slf.py())),
base_name: slf.base_name.clone(),
target_value: slf.target_value,
direction: slf.direction,
instance: slf.instance,
is_terminal: slf.is_terminal,
time_tol,
value_tol,
py_callback: slf.py_callback.as_ref().map(|py_obj| py_obj.clone_ref(slf.py())),
}
}
#[allow(deprecated)]
fn with_callback(mut slf: PyRefMut<'_, Self>, callback: Py<PyAny>) -> Self {
let callback_clone = callback.clone_ref(slf.py());
let rust_callback = Box::new(
move |t: time::Epoch,
state: &DVector<f64>,
_params: Option<&DVector<f64>>|
-> (Option<DVector<f64>>, Option<DVector<f64>>, events::EventAction) {
Python::attach(|py| {
let py_epoch = Py::new(py, PyEpoch { obj: t }).ok();
let state_array = state.as_slice().to_pyarray(py).to_owned();
if py_epoch.is_none() {
return (None, None, events::EventAction::Continue);
}
let result = callback_clone.bind(py).call1((py_epoch.unwrap(), state_array));
match result {
Ok(tuple) => {
let item0 = tuple.get_item(0).ok();
let new_state: Option<DVector<f64>> = item0.and_then(|item| {
if item.is_none() {
None
} else {
pyany_to_f64_array1(&item, None)
.ok()
.map(DVector::from_vec)
}
});
let item1 = tuple.get_item(1).ok();
let action: events::EventAction = item1
.and_then(|item| item.extract::<PyRef<PyEventAction>>().ok())
.map(|a| a.action)
.unwrap_or(events::EventAction::Continue);
(new_state, None, action)
}
Err(e) => {
eprintln!("Warning: callback failed: {}", e);
(None, None, events::EventAction::Continue)
}
}
})
},
) as events::DEventCallback;
let py_callback_stored = callback.clone_ref(slf.py());
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_callback(rust_callback));
}
Self {
event: slf.event.take(),
value_fn_py: slf.value_fn_py.as_ref().map(|py_obj| py_obj.clone_ref(slf.py())),
base_name: slf.base_name.clone(),
target_value: slf.target_value,
direction: slf.direction,
instance: slf.instance,
is_terminal: slf.is_terminal,
time_tol: slf.time_tol,
value_tol: slf.value_tol,
py_callback: Some(py_callback_stored),
}
}
fn set_terminal(mut slf: PyRefMut<'_, Self>) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.set_terminal());
}
Self {
event: slf.event.take(),
value_fn_py: slf.value_fn_py.as_ref().map(|py_obj| py_obj.clone_ref(slf.py())),
base_name: slf.base_name.clone(),
target_value: slf.target_value,
direction: slf.direction,
instance: slf.instance,
is_terminal: true,
time_tol: slf.time_tol,
value_tol: slf.value_tol,
py_callback: slf.py_callback.as_ref().map(|py_obj| py_obj.clone_ref(slf.py())),
}
}
}
impl PyValueEvent {
pub fn get_value_fn_py(&self) -> Option<&Py<PyAny>> {
self.value_fn_py.as_ref()
}
pub fn get_py_callback(&self) -> Option<&Py<PyAny>> {
self.py_callback.as_ref()
}
pub fn get_base_name(&self) -> &str {
&self.base_name
}
pub fn get_target_value(&self) -> f64 {
self.target_value
}
pub fn get_direction(&self) -> events::EventDirection {
self.direction
}
pub fn get_instance(&self) -> Option<usize> {
self.instance
}
pub fn is_terminal(&self) -> bool {
self.is_terminal
}
pub fn get_time_tolerance(&self) -> f64 {
self.time_tol
}
pub fn get_value_tolerance(&self) -> f64 {
self.value_tol
}
pub fn is_consumed(&self) -> bool {
self.event.is_none()
}
pub fn take_d_event(&mut self) -> Option<events::DValueEvent> {
self.event.take()
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "BinaryEvent")]
pub struct PyBinaryEvent {
event: Option<events::DBinaryEvent>,
condition_fn_py: Option<Py<PyAny>>,
base_name: String,
edge: events::EdgeType,
instance: Option<usize>,
is_terminal: bool,
time_tol: f64,
value_tol: f64,
py_callback: Option<Py<PyAny>>,
}
#[pymethods]
impl PyBinaryEvent {
#[new]
#[pyo3(signature = (name, condition_fn, edge))]
#[allow(deprecated)]
fn new(
py: Python<'_>,
name: String,
condition_fn: Py<PyAny>,
edge: PyRef<PyEdgeType>,
) -> PyResult<Self> {
let condition_fn_clone = condition_fn.clone_ref(py);
let rust_condition_fn =
move |t: time::Epoch, state: &DVector<f64>, _params: Option<&DVector<f64>>| -> bool {
Python::attach(|py| {
let py_epoch = match Py::new(py, PyEpoch { obj: t }) {
Ok(e) => e,
Err(e) => {
eprintln!("Warning: Failed to create PyEpoch: {}", e);
return false; }
};
let state_array = state.as_slice().to_pyarray(py).to_owned();
let result: PyResult<Bound<'_, pyo3::PyAny>> = condition_fn_clone
.bind(py)
.call1((py_epoch, state_array));
match result {
Ok(val) => val.extract::<bool>().unwrap_or_else(|e| {
eprintln!("Warning: condition_fn must return bool: {}", e);
false
}),
Err(e) => {
eprintln!("Warning: condition_fn call failed: {}", e);
false
}
}
})
};
let event = events::DBinaryEvent::new(name.clone(), rust_condition_fn, edge.edge);
Ok(PyBinaryEvent {
event: Some(event),
condition_fn_py: Some(condition_fn),
base_name: name,
edge: edge.edge,
instance: None,
is_terminal: false,
time_tol: 1e-6,
value_tol: 1e-9,
py_callback: None,
})
}
fn with_instance(mut slf: PyRefMut<'_, Self>, instance: usize) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_instance(instance));
}
Self {
event: slf.event.take(),
condition_fn_py: slf.condition_fn_py.as_ref().map(|py_obj| py_obj.clone_ref(slf.py())),
base_name: slf.base_name.clone(),
edge: slf.edge,
instance: Some(instance),
is_terminal: slf.is_terminal,
time_tol: slf.time_tol,
value_tol: slf.value_tol,
py_callback: slf.py_callback.as_ref().map(|py_obj| py_obj.clone_ref(slf.py())),
}
}
fn with_tolerances(mut slf: PyRefMut<'_, Self>, time_tol: f64, value_tol: f64) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_tolerances(time_tol, value_tol));
}
Self {
event: slf.event.take(),
condition_fn_py: slf.condition_fn_py.as_ref().map(|py_obj| py_obj.clone_ref(slf.py())),
base_name: slf.base_name.clone(),
edge: slf.edge,
instance: slf.instance,
is_terminal: slf.is_terminal,
time_tol,
value_tol,
py_callback: slf.py_callback.as_ref().map(|py_obj| py_obj.clone_ref(slf.py())),
}
}
#[allow(deprecated)]
fn with_callback(mut slf: PyRefMut<'_, Self>, callback: Py<PyAny>) -> Self {
let callback_clone = callback.clone_ref(slf.py());
let rust_callback = Box::new(
move |t: time::Epoch,
state: &DVector<f64>,
_params: Option<&DVector<f64>>|
-> (Option<DVector<f64>>, Option<DVector<f64>>, events::EventAction) {
Python::attach(|py| {
let py_epoch = Py::new(py, PyEpoch { obj: t }).ok();
let state_array = state.as_slice().to_pyarray(py).to_owned();
if py_epoch.is_none() {
return (None, None, events::EventAction::Continue);
}
let result = callback_clone.bind(py).call1((py_epoch.unwrap(), state_array));
match result {
Ok(tuple) => {
let item0 = tuple.get_item(0).ok();
let new_state: Option<DVector<f64>> = item0.and_then(|item| {
if item.is_none() {
None
} else {
pyany_to_f64_array1(&item, None)
.ok()
.map(DVector::from_vec)
}
});
let item1 = tuple.get_item(1).ok();
let action: events::EventAction = item1
.and_then(|item| item.extract::<PyRef<PyEventAction>>().ok())
.map(|a| a.action)
.unwrap_or(events::EventAction::Continue);
(new_state, None, action)
}
Err(e) => {
eprintln!("Warning: callback failed: {}", e);
(None, None, events::EventAction::Continue)
}
}
})
},
) as events::DEventCallback;
let py_callback_stored = callback.clone_ref(slf.py());
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_callback(rust_callback));
}
Self {
event: slf.event.take(),
condition_fn_py: slf.condition_fn_py.as_ref().map(|py_obj| py_obj.clone_ref(slf.py())),
base_name: slf.base_name.clone(),
edge: slf.edge,
instance: slf.instance,
is_terminal: slf.is_terminal,
time_tol: slf.time_tol,
value_tol: slf.value_tol,
py_callback: Some(py_callback_stored),
}
}
fn set_terminal(mut slf: PyRefMut<'_, Self>) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.set_terminal());
}
Self {
event: slf.event.take(),
condition_fn_py: slf.condition_fn_py.as_ref().map(|py_obj| py_obj.clone_ref(slf.py())),
base_name: slf.base_name.clone(),
edge: slf.edge,
instance: slf.instance,
is_terminal: true,
time_tol: slf.time_tol,
value_tol: slf.value_tol,
py_callback: slf.py_callback.as_ref().map(|py_obj| py_obj.clone_ref(slf.py())),
}
}
}
impl PyBinaryEvent {
pub fn get_condition_fn_py(&self) -> Option<&Py<PyAny>> {
self.condition_fn_py.as_ref()
}
pub fn get_py_callback(&self) -> Option<&Py<PyAny>> {
self.py_callback.as_ref()
}
pub fn get_base_name(&self) -> &str {
&self.base_name
}
pub fn get_edge(&self) -> events::EdgeType {
self.edge
}
pub fn get_instance(&self) -> Option<usize> {
self.instance
}
pub fn is_terminal(&self) -> bool {
self.is_terminal
}
pub fn get_time_tolerance(&self) -> f64 {
self.time_tol
}
pub fn get_value_tolerance(&self) -> f64 {
self.value_tol
}
pub fn is_consumed(&self) -> bool {
self.event.is_none()
}
pub fn take_d_event(&mut self) -> Option<events::DBinaryEvent> {
self.event.take()
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "EventQuery")]
pub struct PyEventQuery {
events: Vec<events::DDetectedEvent>,
}
#[pymethods]
impl PyEventQuery {
#[pyo3(text_signature = "(index)")]
fn by_detector_index(&self, index: usize) -> Self {
PyEventQuery {
events: self
.events
.iter()
.filter(|e| e.detector_index == index)
.cloned()
.collect(),
}
}
#[pyo3(text_signature = "(name)")]
fn by_name_exact(&self, name: &str) -> Self {
PyEventQuery {
events: self
.events
.iter()
.filter(|e| e.name == name)
.cloned()
.collect(),
}
}
#[pyo3(text_signature = "(substring)")]
fn by_name_contains(&self, substring: &str) -> Self {
PyEventQuery {
events: self
.events
.iter()
.filter(|e| e.name.contains(substring))
.cloned()
.collect(),
}
}
#[pyo3(text_signature = "(start, end)")]
fn in_time_range(&self, start: &PyEpoch, end: &PyEpoch) -> Self {
PyEventQuery {
events: self
.events
.iter()
.filter(|e| e.window_open >= start.obj && e.window_open <= end.obj)
.cloned()
.collect(),
}
}
#[pyo3(text_signature = "(epoch)")]
fn after(&self, epoch: &PyEpoch) -> Self {
PyEventQuery {
events: self
.events
.iter()
.filter(|e| e.window_open >= epoch.obj)
.cloned()
.collect(),
}
}
#[pyo3(text_signature = "(epoch)")]
fn before(&self, epoch: &PyEpoch) -> Self {
PyEventQuery {
events: self
.events
.iter()
.filter(|e| e.window_open <= epoch.obj)
.cloned()
.collect(),
}
}
#[pyo3(text_signature = "(event_type)")]
fn by_event_type(&self, event_type: &PyEventType) -> Self {
PyEventQuery {
events: self
.events
.iter()
.filter(|e| e.event_type == event_type.event_type)
.cloned()
.collect(),
}
}
#[pyo3(text_signature = "(action)")]
fn by_action(&self, action: &PyEventAction) -> Self {
PyEventQuery {
events: self
.events
.iter()
.filter(|e| e.action == action.action)
.cloned()
.collect(),
}
}
#[pyo3(text_signature = "()")]
fn collect(&self) -> Vec<PyDetectedEvent> {
self.events
.iter()
.map(|e| PyDetectedEvent { event: e.clone() })
.collect()
}
#[pyo3(text_signature = "()")]
fn count(&self) -> usize {
self.events.len()
}
#[pyo3(text_signature = "()")]
fn first(&self) -> Option<PyDetectedEvent> {
self.events.first().map(|e| PyDetectedEvent { event: e.clone() })
}
#[pyo3(text_signature = "()")]
fn last(&self) -> Option<PyDetectedEvent> {
self.events.last().map(|e| PyDetectedEvent { event: e.clone() })
}
#[pyo3(name = "any")]
#[pyo3(text_signature = "()")]
fn any_matches(&self) -> bool {
!self.events.is_empty()
}
#[pyo3(text_signature = "()")]
fn is_empty(&self) -> bool {
self.events.is_empty()
}
fn __iter__(slf: PyRef<'_, Self>) -> PyEventQueryIterator {
PyEventQueryIterator {
events: slf.events.clone(),
index: 0,
}
}
fn __len__(&self) -> usize {
self.events.len()
}
fn __repr__(&self) -> String {
format!("EventQuery({} events)", self.events.len())
}
}
impl PyEventQuery {
pub(crate) fn new(events: Vec<events::DDetectedEvent>) -> Self {
PyEventQuery { events }
}
}
#[pyclass(module = "brahe._brahe")]
pub struct PyEventQueryIterator {
events: Vec<events::DDetectedEvent>,
index: usize,
}
#[pymethods]
impl PyEventQueryIterator {
fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
slf
}
fn __next__(mut slf: PyRefMut<'_, Self>) -> Option<PyDetectedEvent> {
if slf.index < slf.events.len() {
let event = slf.events[slf.index].clone();
slf.index += 1;
Some(PyDetectedEvent { event })
} else {
None
}
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "AltitudeEvent")]
pub struct PyAltitudeEvent {
event: Option<events::DAltitudeEvent>,
target_altitude: f64,
base_name: String,
direction: events::EventDirection,
instance: Option<usize>,
is_terminal: bool,
time_tol: f64,
value_tol: f64,
py_callback: Option<Py<PyAny>>,
}
#[pymethods]
impl PyAltitudeEvent {
#[new]
#[pyo3(signature = (value_altitude, name, direction))]
fn new(
value_altitude: f64,
name: String,
direction: PyRef<PyEventDirection>,
) -> PyResult<Self> {
let event = events::DAltitudeEvent::new(value_altitude, name.clone(), direction.direction);
Ok(PyAltitudeEvent {
event: Some(event),
target_altitude: value_altitude,
base_name: name,
direction: direction.direction,
instance: None,
is_terminal: false,
time_tol: 1e-6,
value_tol: 1e-9,
py_callback: None,
})
}
fn with_instance(mut slf: PyRefMut<'_, Self>, instance: usize) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_instance(instance));
}
Self {
event: slf.event.take(),
target_altitude: slf.target_altitude,
base_name: slf.base_name.clone(),
direction: slf.direction,
instance: Some(instance),
is_terminal: slf.is_terminal,
time_tol: slf.time_tol,
value_tol: slf.value_tol,
py_callback: slf.py_callback.as_ref().map(|py_obj| py_obj.clone_ref(slf.py())),
}
}
fn with_tolerances(mut slf: PyRefMut<'_, Self>, time_tol: f64, value_tol: f64) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_tolerances(time_tol, value_tol));
}
Self {
event: slf.event.take(),
target_altitude: slf.target_altitude,
base_name: slf.base_name.clone(),
direction: slf.direction,
instance: slf.instance,
is_terminal: slf.is_terminal,
time_tol,
value_tol,
py_callback: slf.py_callback.as_ref().map(|py_obj| py_obj.clone_ref(slf.py())),
}
}
#[allow(deprecated)]
fn with_callback(mut slf: PyRefMut<'_, Self>, callback: Py<PyAny>) -> Self {
let callback_clone = callback.clone_ref(slf.py());
let rust_callback = Box::new(
move |t: time::Epoch,
state: &DVector<f64>,
_params: Option<&DVector<f64>>|
-> (Option<DVector<f64>>, Option<DVector<f64>>, events::EventAction) {
Python::attach(|py| {
let py_epoch = Py::new(py, PyEpoch { obj: t }).ok();
let state_array = state.as_slice().to_pyarray(py).to_owned();
if py_epoch.is_none() {
return (None, None, events::EventAction::Continue);
}
let result = callback_clone.bind(py).call1((py_epoch.unwrap(), state_array));
match result {
Ok(tuple) => {
let new_state: Option<DVector<f64>> = tuple
.get_item(0)
.ok()
.and_then(|item| {
if item.is_none() {
None
} else {
pyany_to_f64_array1(&item, None)
.ok()
.map(DVector::from_vec)
}
});
let action: events::EventAction = tuple
.get_item(1)
.ok()
.and_then(|item| item.extract::<PyRef<PyEventAction>>().ok())
.map(|a| a.action)
.unwrap_or(events::EventAction::Continue);
(new_state, None, action)
}
Err(e) => {
eprintln!("Warning: callback failed: {}", e);
(None, None, events::EventAction::Continue)
}
}
})
},
) as events::DEventCallback;
let py_callback_stored = callback.clone_ref(slf.py());
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_callback(rust_callback));
}
Self {
event: slf.event.take(),
target_altitude: slf.target_altitude,
base_name: slf.base_name.clone(),
direction: slf.direction,
instance: slf.instance,
is_terminal: slf.is_terminal,
time_tol: slf.time_tol,
value_tol: slf.value_tol,
py_callback: Some(py_callback_stored),
}
}
fn set_terminal(mut slf: PyRefMut<'_, Self>) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.set_terminal());
}
Self {
event: slf.event.take(),
target_altitude: slf.target_altitude,
base_name: slf.base_name.clone(),
direction: slf.direction,
instance: slf.instance,
is_terminal: true,
time_tol: slf.time_tol,
value_tol: slf.value_tol,
py_callback: slf.py_callback.as_ref().map(|py_obj| py_obj.clone_ref(slf.py())),
}
}
}
impl PyAltitudeEvent {
pub fn get_py_callback(&self) -> Option<&Py<PyAny>> {
self.py_callback.as_ref()
}
pub fn get_base_name(&self) -> &str {
&self.base_name
}
pub fn get_target_altitude(&self) -> f64 {
self.target_altitude
}
pub fn get_direction(&self) -> events::EventDirection {
self.direction
}
pub fn get_instance(&self) -> Option<usize> {
self.instance
}
pub fn is_terminal(&self) -> bool {
self.is_terminal
}
pub fn get_time_tolerance(&self) -> f64 {
self.time_tol
}
pub fn get_value_tolerance(&self) -> f64 {
self.value_tol
}
pub fn is_consumed(&self) -> bool {
self.event.is_none()
}
pub fn take_d_event(&mut self) -> Option<events::DAltitudeEvent> {
self.event.take()
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "SemiMajorAxisEvent")]
pub struct PySemiMajorAxisEvent {
event: Option<events::DSemiMajorAxisEvent>,
}
#[pymethods]
impl PySemiMajorAxisEvent {
#[new]
#[pyo3(signature = (value, name, direction))]
fn new(value: f64, name: String, direction: PyRef<PyEventDirection>) -> PyResult<Self> {
let event = events::DSemiMajorAxisEvent::new(value, name, direction.direction);
Ok(PySemiMajorAxisEvent { event: Some(event) })
}
fn with_instance(mut slf: PyRefMut<'_, Self>, instance: usize) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_instance(instance));
}
Self { event: slf.event.take() }
}
fn with_tolerances(mut slf: PyRefMut<'_, Self>, time_tol: f64, value_tol: f64) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_tolerances(time_tol, value_tol));
}
Self { event: slf.event.take() }
}
fn set_terminal(mut slf: PyRefMut<'_, Self>) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.set_terminal());
}
Self { event: slf.event.take() }
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "EccentricityEvent")]
pub struct PyEccentricityEvent {
event: Option<events::DEccentricityEvent>,
}
#[pymethods]
impl PyEccentricityEvent {
#[new]
#[pyo3(signature = (value, name, direction))]
fn new(value: f64, name: String, direction: PyRef<PyEventDirection>) -> PyResult<Self> {
let event = events::DEccentricityEvent::new(value, name, direction.direction);
Ok(PyEccentricityEvent { event: Some(event) })
}
fn with_instance(mut slf: PyRefMut<'_, Self>, instance: usize) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_instance(instance));
}
Self { event: slf.event.take() }
}
fn with_tolerances(mut slf: PyRefMut<'_, Self>, time_tol: f64, value_tol: f64) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_tolerances(time_tol, value_tol));
}
Self { event: slf.event.take() }
}
fn set_terminal(mut slf: PyRefMut<'_, Self>) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.set_terminal());
}
Self { event: slf.event.take() }
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "InclinationEvent")]
pub struct PyInclinationEvent {
event: Option<events::DInclinationEvent>,
}
#[pymethods]
impl PyInclinationEvent {
#[new]
#[pyo3(signature = (value, name, direction, angle_format))]
fn new(
value: f64,
name: String,
direction: PyRef<PyEventDirection>,
angle_format: PyRef<PyAngleFormat>,
) -> PyResult<Self> {
let event =
events::DInclinationEvent::new(value, name, direction.direction, angle_format.value);
Ok(PyInclinationEvent { event: Some(event) })
}
fn with_instance(mut slf: PyRefMut<'_, Self>, instance: usize) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_instance(instance));
}
Self { event: slf.event.take() }
}
fn with_tolerances(mut slf: PyRefMut<'_, Self>, time_tol: f64, value_tol: f64) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_tolerances(time_tol, value_tol));
}
Self { event: slf.event.take() }
}
fn set_terminal(mut slf: PyRefMut<'_, Self>) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.set_terminal());
}
Self { event: slf.event.take() }
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "ArgumentOfPerigeeEvent")]
pub struct PyArgumentOfPerigeeEvent {
event: Option<events::DArgumentOfPerigeeEvent>,
}
#[pymethods]
impl PyArgumentOfPerigeeEvent {
#[new]
#[pyo3(signature = (value, name, direction, angle_format))]
fn new(
value: f64,
name: String,
direction: PyRef<PyEventDirection>,
angle_format: PyRef<PyAngleFormat>,
) -> PyResult<Self> {
let event = events::DArgumentOfPerigeeEvent::new(
value,
name,
direction.direction,
angle_format.value,
);
Ok(PyArgumentOfPerigeeEvent { event: Some(event) })
}
fn with_instance(mut slf: PyRefMut<'_, Self>, instance: usize) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_instance(instance));
}
Self { event: slf.event.take() }
}
fn with_tolerances(mut slf: PyRefMut<'_, Self>, time_tol: f64, value_tol: f64) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_tolerances(time_tol, value_tol));
}
Self { event: slf.event.take() }
}
fn set_terminal(mut slf: PyRefMut<'_, Self>) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.set_terminal());
}
Self { event: slf.event.take() }
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "MeanAnomalyEvent")]
pub struct PyMeanAnomalyEvent {
event: Option<events::DMeanAnomalyEvent>,
}
#[pymethods]
impl PyMeanAnomalyEvent {
#[new]
#[pyo3(signature = (value, name, direction, angle_format))]
fn new(
value: f64,
name: String,
direction: PyRef<PyEventDirection>,
angle_format: PyRef<PyAngleFormat>,
) -> PyResult<Self> {
let event =
events::DMeanAnomalyEvent::new(value, name, direction.direction, angle_format.value);
Ok(PyMeanAnomalyEvent { event: Some(event) })
}
fn with_instance(mut slf: PyRefMut<'_, Self>, instance: usize) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_instance(instance));
}
Self { event: slf.event.take() }
}
fn with_tolerances(mut slf: PyRefMut<'_, Self>, time_tol: f64, value_tol: f64) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_tolerances(time_tol, value_tol));
}
Self { event: slf.event.take() }
}
fn set_terminal(mut slf: PyRefMut<'_, Self>) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.set_terminal());
}
Self { event: slf.event.take() }
}
}
impl PyMeanAnomalyEvent {
pub fn take_d_event(&mut self) -> Option<events::DMeanAnomalyEvent> {
self.event.take()
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "EccentricAnomalyEvent")]
pub struct PyEccentricAnomalyEvent {
event: Option<events::DEccentricAnomalyEvent>,
}
#[pymethods]
impl PyEccentricAnomalyEvent {
#[new]
#[pyo3(signature = (value, name, direction, angle_format))]
fn new(
value: f64,
name: String,
direction: PyRef<PyEventDirection>,
angle_format: PyRef<PyAngleFormat>,
) -> PyResult<Self> {
let event = events::DEccentricAnomalyEvent::new(
value,
name,
direction.direction,
angle_format.value,
);
Ok(PyEccentricAnomalyEvent { event: Some(event) })
}
fn with_instance(mut slf: PyRefMut<'_, Self>, instance: usize) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_instance(instance));
}
Self { event: slf.event.take() }
}
fn with_tolerances(mut slf: PyRefMut<'_, Self>, time_tol: f64, value_tol: f64) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_tolerances(time_tol, value_tol));
}
Self { event: slf.event.take() }
}
fn set_terminal(mut slf: PyRefMut<'_, Self>) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.set_terminal());
}
Self { event: slf.event.take() }
}
}
impl PyEccentricAnomalyEvent {
pub fn take_d_event(&mut self) -> Option<events::DEccentricAnomalyEvent> {
self.event.take()
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "TrueAnomalyEvent")]
pub struct PyTrueAnomalyEvent {
event: Option<events::DTrueAnomalyEvent>,
}
#[pymethods]
impl PyTrueAnomalyEvent {
#[new]
#[pyo3(signature = (value, name, direction, angle_format))]
fn new(
value: f64,
name: String,
direction: PyRef<PyEventDirection>,
angle_format: PyRef<PyAngleFormat>,
) -> PyResult<Self> {
let event = events::DTrueAnomalyEvent::new(
value,
name,
direction.direction,
angle_format.value,
);
Ok(PyTrueAnomalyEvent { event: Some(event) })
}
fn with_instance(mut slf: PyRefMut<'_, Self>, instance: usize) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_instance(instance));
}
Self { event: slf.event.take() }
}
fn with_tolerances(mut slf: PyRefMut<'_, Self>, time_tol: f64, value_tol: f64) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_tolerances(time_tol, value_tol));
}
Self { event: slf.event.take() }
}
fn set_terminal(mut slf: PyRefMut<'_, Self>) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.set_terminal());
}
Self { event: slf.event.take() }
}
}
impl PyTrueAnomalyEvent {
pub fn take_d_event(&mut self) -> Option<events::DTrueAnomalyEvent> {
self.event.take()
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "ArgumentOfLatitudeEvent")]
pub struct PyArgumentOfLatitudeEvent {
event: Option<events::DArgumentOfLatitudeEvent>,
}
#[pymethods]
impl PyArgumentOfLatitudeEvent {
#[new]
#[pyo3(signature = (value, name, direction, angle_format))]
fn new(
value: f64,
name: String,
direction: PyRef<PyEventDirection>,
angle_format: PyRef<PyAngleFormat>,
) -> PyResult<Self> {
let event = events::DArgumentOfLatitudeEvent::new(
value,
name,
direction.direction,
angle_format.value,
);
Ok(PyArgumentOfLatitudeEvent { event: Some(event) })
}
fn with_instance(mut slf: PyRefMut<'_, Self>, instance: usize) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_instance(instance));
}
Self { event: slf.event.take() }
}
fn with_tolerances(mut slf: PyRefMut<'_, Self>, time_tol: f64, value_tol: f64) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_tolerances(time_tol, value_tol));
}
Self { event: slf.event.take() }
}
fn set_terminal(mut slf: PyRefMut<'_, Self>) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.set_terminal());
}
Self { event: slf.event.take() }
}
}
impl PyArgumentOfLatitudeEvent {
pub fn take_d_event(&mut self) -> Option<events::DArgumentOfLatitudeEvent> {
self.event.take()
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "AscendingNodeEvent")]
pub struct PyAscendingNodeEvent {
event: Option<events::DAscendingNodeEvent>,
}
#[pymethods]
impl PyAscendingNodeEvent {
#[new]
#[pyo3(signature = (name))]
fn new(name: String) -> PyResult<Self> {
let event = events::DAscendingNodeEvent::new(name);
Ok(PyAscendingNodeEvent { event: Some(event) })
}
fn with_instance(mut slf: PyRefMut<'_, Self>, instance: usize) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_instance(instance));
}
Self { event: slf.event.take() }
}
fn with_tolerances(mut slf: PyRefMut<'_, Self>, time_tol: f64, value_tol: f64) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_tolerances(time_tol, value_tol));
}
Self { event: slf.event.take() }
}
fn set_terminal(mut slf: PyRefMut<'_, Self>) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.set_terminal());
}
Self { event: slf.event.take() }
}
}
impl PyAscendingNodeEvent {
pub fn take_d_event(&mut self) -> Option<events::DAscendingNodeEvent> {
self.event.take()
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "DescendingNodeEvent")]
pub struct PyDescendingNodeEvent {
event: Option<events::DDescendingNodeEvent>,
}
#[pymethods]
impl PyDescendingNodeEvent {
#[new]
#[pyo3(signature = (name))]
fn new(name: String) -> PyResult<Self> {
let event = events::DDescendingNodeEvent::new(name);
Ok(PyDescendingNodeEvent { event: Some(event) })
}
fn with_instance(mut slf: PyRefMut<'_, Self>, instance: usize) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_instance(instance));
}
Self { event: slf.event.take() }
}
fn with_tolerances(mut slf: PyRefMut<'_, Self>, time_tol: f64, value_tol: f64) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_tolerances(time_tol, value_tol));
}
Self { event: slf.event.take() }
}
fn set_terminal(mut slf: PyRefMut<'_, Self>) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.set_terminal());
}
Self { event: slf.event.take() }
}
}
impl PyDescendingNodeEvent {
pub fn take_d_event(&mut self) -> Option<events::DDescendingNodeEvent> {
self.event.take()
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "SpeedEvent")]
pub struct PySpeedEvent {
event: Option<events::DSpeedEvent>,
}
#[pymethods]
impl PySpeedEvent {
#[new]
#[pyo3(signature = (value, name, direction))]
fn new(value: f64, name: String, direction: PyRef<PyEventDirection>) -> PyResult<Self> {
let event = events::DSpeedEvent::new(value, name, direction.direction);
Ok(PySpeedEvent { event: Some(event) })
}
fn with_instance(mut slf: PyRefMut<'_, Self>, instance: usize) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_instance(instance));
}
Self { event: slf.event.take() }
}
fn with_tolerances(mut slf: PyRefMut<'_, Self>, time_tol: f64, value_tol: f64) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_tolerances(time_tol, value_tol));
}
Self { event: slf.event.take() }
}
fn set_terminal(mut slf: PyRefMut<'_, Self>) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.set_terminal());
}
Self { event: slf.event.take() }
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "LongitudeEvent")]
pub struct PyLongitudeEvent {
event: Option<events::DLongitudeEvent>,
}
#[pymethods]
impl PyLongitudeEvent {
#[new]
#[pyo3(signature = (value, name, direction, angle_format))]
fn new(
value: f64,
name: String,
direction: PyRef<PyEventDirection>,
angle_format: PyRef<PyAngleFormat>,
) -> PyResult<Self> {
let event =
events::DLongitudeEvent::new(value, name, direction.direction, angle_format.value);
Ok(PyLongitudeEvent { event: Some(event) })
}
fn with_instance(mut slf: PyRefMut<'_, Self>, instance: usize) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_instance(instance));
}
Self { event: slf.event.take() }
}
fn with_tolerances(mut slf: PyRefMut<'_, Self>, time_tol: f64, value_tol: f64) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_tolerances(time_tol, value_tol));
}
Self { event: slf.event.take() }
}
fn set_terminal(mut slf: PyRefMut<'_, Self>) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.set_terminal());
}
Self { event: slf.event.take() }
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "LatitudeEvent")]
pub struct PyLatitudeEvent {
event: Option<events::DLatitudeEvent>,
}
#[pymethods]
impl PyLatitudeEvent {
#[new]
#[pyo3(signature = (value, name, direction, angle_format))]
fn new(
value: f64,
name: String,
direction: PyRef<PyEventDirection>,
angle_format: PyRef<PyAngleFormat>,
) -> PyResult<Self> {
let event =
events::DLatitudeEvent::new(value, name, direction.direction, angle_format.value);
Ok(PyLatitudeEvent { event: Some(event) })
}
fn with_instance(mut slf: PyRefMut<'_, Self>, instance: usize) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_instance(instance));
}
Self { event: slf.event.take() }
}
fn with_tolerances(mut slf: PyRefMut<'_, Self>, time_tol: f64, value_tol: f64) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_tolerances(time_tol, value_tol));
}
Self { event: slf.event.take() }
}
fn set_terminal(mut slf: PyRefMut<'_, Self>) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.set_terminal());
}
Self { event: slf.event.take() }
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "UmbraEvent")]
pub struct PyUmbraEvent {
event: Option<events::DUmbraEvent>,
}
#[pymethods]
impl PyUmbraEvent {
#[new]
#[pyo3(signature = (name, edge, ephemeris_source))]
fn new(
name: String,
edge: PyRef<PyEdgeType>,
ephemeris_source: Option<PyRef<PyEphemerisSource>>,
) -> PyResult<Self> {
let source = ephemeris_source.map(|s| (*s).into());
let event = events::DUmbraEvent::new(name, edge.edge, source);
Ok(PyUmbraEvent { event: Some(event) })
}
fn with_instance(mut slf: PyRefMut<'_, Self>, instance: usize) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_instance(instance));
}
Self { event: slf.event.take() }
}
fn with_tolerances(mut slf: PyRefMut<'_, Self>, time_tol: f64, value_tol: f64) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_tolerances(time_tol, value_tol));
}
Self { event: slf.event.take() }
}
fn set_terminal(mut slf: PyRefMut<'_, Self>) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.set_terminal());
}
Self { event: slf.event.take() }
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "PenumbraEvent")]
pub struct PyPenumbraEvent {
event: Option<events::DPenumbraEvent>,
}
#[pymethods]
impl PyPenumbraEvent {
#[new]
#[pyo3(signature = (name, edge, ephemeris_source))]
fn new(
name: String,
edge: PyRef<PyEdgeType>,
ephemeris_source: Option<PyRef<PyEphemerisSource>>,
) -> PyResult<Self> {
let source = ephemeris_source.map(|s| (*s).into());
let event = events::DPenumbraEvent::new(name, edge.edge, source);
Ok(PyPenumbraEvent { event: Some(event) })
}
fn with_instance(mut slf: PyRefMut<'_, Self>, instance: usize) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_instance(instance));
}
Self { event: slf.event.take() }
}
fn with_tolerances(mut slf: PyRefMut<'_, Self>, time_tol: f64, value_tol: f64) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_tolerances(time_tol, value_tol));
}
Self { event: slf.event.take() }
}
fn set_terminal(mut slf: PyRefMut<'_, Self>) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.set_terminal());
}
Self { event: slf.event.take() }
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "EclipseEvent")]
pub struct PyEclipseEvent {
event: Option<events::DEclipseEvent>,
}
#[pymethods]
impl PyEclipseEvent {
#[new]
#[pyo3(signature = (name, edge, ephemeris_source))]
fn new(
name: String,
edge: PyRef<PyEdgeType>,
ephemeris_source: Option<PyRef<PyEphemerisSource>>,
) -> PyResult<Self> {
let source = ephemeris_source.map(|s| (*s).into());
let event = events::DEclipseEvent::new(name, edge.edge, source);
Ok(PyEclipseEvent { event: Some(event) })
}
fn with_instance(mut slf: PyRefMut<'_, Self>, instance: usize) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_instance(instance));
}
Self { event: slf.event.take() }
}
fn with_tolerances(mut slf: PyRefMut<'_, Self>, time_tol: f64, value_tol: f64) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_tolerances(time_tol, value_tol));
}
Self { event: slf.event.take() }
}
fn set_terminal(mut slf: PyRefMut<'_, Self>) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.set_terminal());
}
Self { event: slf.event.take() }
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "SunlitEvent")]
pub struct PySunlitEvent {
event: Option<events::DSunlitEvent>,
}
#[pymethods]
impl PySunlitEvent {
#[new]
#[pyo3(signature = (name, edge, ephemeris_source))]
fn new(
name: String,
edge: PyRef<PyEdgeType>,
ephemeris_source: Option<PyRef<PyEphemerisSource>>,
) -> PyResult<Self> {
let source = ephemeris_source.map(|s| (*s).into());
let event = events::DSunlitEvent::new(name, edge.edge, source);
Ok(PySunlitEvent { event: Some(event) })
}
fn with_instance(mut slf: PyRefMut<'_, Self>, instance: usize) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_instance(instance));
}
Self { event: slf.event.take() }
}
fn with_tolerances(mut slf: PyRefMut<'_, Self>, time_tol: f64, value_tol: f64) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_tolerances(time_tol, value_tol));
}
Self { event: slf.event.take() }
}
fn set_terminal(mut slf: PyRefMut<'_, Self>) -> Self {
if let Some(event) = slf.event.take() {
slf.event = Some(event.set_terminal());
}
Self { event: slf.event.take() }
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "AOIEntryEvent")]
pub struct PyAOIEntryEvent {
pub(crate) event: Option<events::DAOIEntryEvent>,
pub(crate) vertices_rad: Vec<(f64, f64)>,
pub(crate) name: String,
pub(crate) is_terminal: bool,
}
#[pymethods]
impl PyAOIEntryEvent {
#[new]
#[pyo3(signature = (polygon, name))]
fn new(polygon: PyRef<PyPolygonLocation>, name: String) -> PyResult<Self> {
let vertices_rad: Vec<(f64, f64)> = polygon.location.vertices()
.iter()
.map(|v| (v[0].to_radians(), v[1].to_radians()))
.collect();
let event = events::DAOIEntryEvent::from_polygon(&polygon.location, name.clone());
Ok(PyAOIEntryEvent {
event: Some(event),
vertices_rad,
name,
is_terminal: false,
})
}
#[staticmethod]
#[pyo3(signature = (vertices, name, angle_format))]
fn from_coordinates(
vertices: Vec<(f64, f64)>,
name: String,
angle_format: PyRef<PyAngleFormat>,
) -> PyResult<Self> {
let vertices_rad: Vec<(f64, f64)> = match angle_format.value {
constants::AngleFormat::Degrees => vertices.iter()
.map(|(lon, lat)| (lon.to_radians(), lat.to_radians()))
.collect(),
constants::AngleFormat::Radians => vertices.clone(),
};
let event = events::DAOIEntryEvent::from_coordinates(&vertices, name.clone(), angle_format.value);
Ok(PyAOIEntryEvent {
event: Some(event),
vertices_rad,
name,
is_terminal: false,
})
}
fn with_instance(mut slf: PyRefMut<'_, Self>, instance: usize) -> Self {
let vertices_rad = std::mem::take(&mut slf.vertices_rad);
let name = std::mem::take(&mut slf.name);
let is_terminal = slf.is_terminal;
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_instance(instance));
}
Self {
event: slf.event.take(),
vertices_rad,
name,
is_terminal,
}
}
fn with_tolerances(mut slf: PyRefMut<'_, Self>, time_tol: f64, value_tol: f64) -> Self {
let vertices_rad = std::mem::take(&mut slf.vertices_rad);
let name = std::mem::take(&mut slf.name);
let is_terminal = slf.is_terminal;
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_tolerances(time_tol, value_tol));
}
Self {
event: slf.event.take(),
vertices_rad,
name,
is_terminal,
}
}
fn set_terminal(mut slf: PyRefMut<'_, Self>) -> Self {
let vertices_rad = std::mem::take(&mut slf.vertices_rad);
let name = std::mem::take(&mut slf.name);
if let Some(event) = slf.event.take() {
slf.event = Some(event.set_terminal());
}
Self {
event: slf.event.take(),
vertices_rad,
name,
is_terminal: true,
}
}
}
impl PyAOIEntryEvent {
pub fn take_d_event(&mut self) -> Option<events::DAOIEntryEvent> {
self.event.take()
}
}
#[pyclass(module = "brahe._brahe")]
#[pyo3(name = "AOIExitEvent")]
pub struct PyAOIExitEvent {
pub(crate) event: Option<events::DAOIExitEvent>,
pub(crate) vertices_rad: Vec<(f64, f64)>,
pub(crate) name: String,
pub(crate) is_terminal: bool,
}
#[pymethods]
impl PyAOIExitEvent {
#[new]
#[pyo3(signature = (polygon, name))]
fn new(polygon: PyRef<PyPolygonLocation>, name: String) -> PyResult<Self> {
let vertices_rad: Vec<(f64, f64)> = polygon.location.vertices()
.iter()
.map(|v| (v[0].to_radians(), v[1].to_radians()))
.collect();
let event = events::DAOIExitEvent::from_polygon(&polygon.location, name.clone());
Ok(PyAOIExitEvent {
event: Some(event),
vertices_rad,
name,
is_terminal: false,
})
}
#[staticmethod]
#[pyo3(signature = (vertices, name, angle_format))]
fn from_coordinates(
vertices: Vec<(f64, f64)>,
name: String,
angle_format: PyRef<PyAngleFormat>,
) -> PyResult<Self> {
let vertices_rad: Vec<(f64, f64)> = match angle_format.value {
constants::AngleFormat::Degrees => vertices.iter()
.map(|(lon, lat)| (lon.to_radians(), lat.to_radians()))
.collect(),
constants::AngleFormat::Radians => vertices.clone(),
};
let event = events::DAOIExitEvent::from_coordinates(&vertices, name.clone(), angle_format.value);
Ok(PyAOIExitEvent {
event: Some(event),
vertices_rad,
name,
is_terminal: false,
})
}
fn with_instance(mut slf: PyRefMut<'_, Self>, instance: usize) -> Self {
let vertices_rad = std::mem::take(&mut slf.vertices_rad);
let name = std::mem::take(&mut slf.name);
let is_terminal = slf.is_terminal;
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_instance(instance));
}
Self {
event: slf.event.take(),
vertices_rad,
name,
is_terminal,
}
}
fn with_tolerances(mut slf: PyRefMut<'_, Self>, time_tol: f64, value_tol: f64) -> Self {
let vertices_rad = std::mem::take(&mut slf.vertices_rad);
let name = std::mem::take(&mut slf.name);
let is_terminal = slf.is_terminal;
if let Some(event) = slf.event.take() {
slf.event = Some(event.with_tolerances(time_tol, value_tol));
}
Self {
event: slf.event.take(),
vertices_rad,
name,
is_terminal,
}
}
fn set_terminal(mut slf: PyRefMut<'_, Self>) -> Self {
let vertices_rad = std::mem::take(&mut slf.vertices_rad);
let name = std::mem::take(&mut slf.name);
if let Some(event) = slf.event.take() {
slf.event = Some(event.set_terminal());
}
Self {
event: slf.event.take(),
vertices_rad,
name,
is_terminal: true,
}
}
}
impl PyAOIExitEvent {
pub fn take_d_event(&mut self) -> Option<events::DAOIExitEvent> {
self.event.take()
}
}