use pyo3::create_exception;
use pyo3::exceptions::PyException;
use pyo3::prelude::*;
use pyo3::types::{PyBytes, PyModule};
use crate::operations::auth::SendAuthenticationInfoArg;
use crate::operations::location::UpdateLocationArg;
use crate::operations::mo_forward_sm::MoForwardSmArg;
use crate::operations::mt_forward_sm::MtForwardSmArg;
use crate::operations::report_sm::{ReportSmDeliveryStatusArg, SmDeliveryOutcome};
use crate::operations::sri_sm::{RoutingInfoForSmArg, RoutingInfoForSmRes};
use crate::types::{op_codes, operation_name, LocationInfoWithLmsi, SmRpDa, SmRpOa};
create_exception!(
gsm_map,
MapError,
PyException,
"GSM MAP protocol / BER codec error (3GPP TS 29.002)."
);
fn decode_err(e: rasn::error::DecodeError) -> PyErr {
MapError::new_err(format!("BER decode error: {e}"))
}
fn encode_err(e: rasn::error::EncodeError) -> PyErr {
MapError::new_err(format!("BER encode error: {e}"))
}
fn ber_encode<T: rasn::Encode>(val: &T, py: Python<'_>) -> PyResult<Py<PyBytes>> {
let bytes = rasn::ber::encode(val).map_err(encode_err)?;
Ok(PyBytes::new(py, &bytes).unbind())
}
fn ber_decode<T: rasn::Decode>(data: &[u8]) -> PyResult<T> {
rasn::ber::decode(data).map_err(decode_err)
}
#[pyclass(name = "SmRpDa", module = "gsm_map._gsm_map", from_py_object)]
#[derive(Clone)]
pub struct PySmRpDa {
inner: SmRpDa,
}
#[pymethods]
impl PySmRpDa {
#[staticmethod]
fn imsi(value: Vec<u8>) -> Self {
Self {
inner: SmRpDa::Imsi(value.into()),
}
}
#[staticmethod]
fn lmsi(value: Vec<u8>) -> Self {
Self {
inner: SmRpDa::Lmsi(value.into()),
}
}
#[staticmethod]
fn service_centre(value: Vec<u8>) -> Self {
Self {
inner: SmRpDa::ServiceCentreAddressDa(value.into()),
}
}
#[staticmethod]
fn no_sm_rp_da() -> Self {
Self {
inner: SmRpDa::NoSmRpDa(()),
}
}
#[getter]
fn kind(&self) -> &'static str {
match self.inner {
SmRpDa::Imsi(_) => "imsi",
SmRpDa::Lmsi(_) => "lmsi",
SmRpDa::ServiceCentreAddressDa(_) => "service_centre",
SmRpDa::NoSmRpDa(()) => "no_sm_rp_da",
}
}
#[getter]
fn value<'py>(&self, py: Python<'py>) -> Option<Bound<'py, PyBytes>> {
match &self.inner {
SmRpDa::Imsi(v) | SmRpDa::Lmsi(v) | SmRpDa::ServiceCentreAddressDa(v) => {
Some(PyBytes::new(py, v))
}
SmRpDa::NoSmRpDa(()) => None,
}
}
fn __repr__(&self) -> String {
format!("SmRpDa({})", self.inner)
}
}
#[pyclass(name = "SmRpOa", module = "gsm_map._gsm_map", from_py_object)]
#[derive(Clone)]
pub struct PySmRpOa {
inner: SmRpOa,
}
#[pymethods]
impl PySmRpOa {
#[staticmethod]
fn msisdn(value: Vec<u8>) -> Self {
Self {
inner: SmRpOa::MsIsdn(value.into()),
}
}
#[staticmethod]
fn service_centre(value: Vec<u8>) -> Self {
Self {
inner: SmRpOa::ServiceCentreAddressOa(value.into()),
}
}
#[staticmethod]
fn no_sm_rp_oa() -> Self {
Self {
inner: SmRpOa::NoSmRpOa(()),
}
}
#[getter]
fn kind(&self) -> &'static str {
match self.inner {
SmRpOa::MsIsdn(_) => "msisdn",
SmRpOa::ServiceCentreAddressOa(_) => "service_centre",
SmRpOa::NoSmRpOa(()) => "no_sm_rp_oa",
}
}
#[getter]
fn value<'py>(&self, py: Python<'py>) -> Option<Bound<'py, PyBytes>> {
match &self.inner {
SmRpOa::MsIsdn(v) | SmRpOa::ServiceCentreAddressOa(v) => Some(PyBytes::new(py, v)),
SmRpOa::NoSmRpOa(()) => None,
}
}
fn __repr__(&self) -> String {
format!("SmRpOa({})", self.inner)
}
}
#[pyclass(
name = "LocationInfoWithLmsi",
module = "gsm_map._gsm_map",
from_py_object
)]
#[derive(Clone)]
pub struct PyLocationInfoWithLmsi {
inner: LocationInfoWithLmsi,
}
#[pymethods]
impl PyLocationInfoWithLmsi {
#[new]
#[pyo3(signature = (network_node_number, *, lmsi = None, gprs_node_indicator = false, additional_number = None))]
fn new(
network_node_number: Vec<u8>,
lmsi: Option<Vec<u8>>,
gprs_node_indicator: bool,
additional_number: Option<Vec<u8>>,
) -> Self {
Self {
inner: LocationInfoWithLmsi {
network_node_number: network_node_number.into(),
lmsi: lmsi.map(Into::into),
gprs_node_indicator: gprs_node_indicator.then_some(()),
additional_number: additional_number.map(Into::into),
},
}
}
#[getter]
fn network_node_number<'py>(&self, py: Python<'py>) -> Bound<'py, PyBytes> {
PyBytes::new(py, &self.inner.network_node_number)
}
#[getter]
fn lmsi<'py>(&self, py: Python<'py>) -> Option<Bound<'py, PyBytes>> {
self.inner.lmsi.as_ref().map(|v| PyBytes::new(py, v))
}
#[getter]
fn gprs_node_indicator(&self) -> bool {
self.inner.gprs_node_indicator.is_some()
}
#[getter]
fn additional_number<'py>(&self, py: Python<'py>) -> Option<Bound<'py, PyBytes>> {
self.inner
.additional_number
.as_ref()
.map(|v| PyBytes::new(py, v))
}
fn __repr__(&self) -> String {
format!(
"LocationInfoWithLmsi(network_node_number={}, lmsi={})",
hex::encode(&self.inner.network_node_number),
self.inner
.lmsi
.as_ref()
.map_or_else(|| "None".to_string(), hex::encode)
)
}
}
#[pyclass(
name = "RoutingInfoForSmArg",
module = "gsm_map._gsm_map",
skip_from_py_object
)]
#[derive(Clone)]
pub struct PyRoutingInfoForSmArg {
inner: RoutingInfoForSmArg,
}
#[pymethods]
impl PyRoutingInfoForSmArg {
#[new]
#[pyo3(signature = (msisdn, service_centre_address, *, sm_rp_pri = true, gprs_support_indicator = false, sm_rp_mti = None))]
fn new(
msisdn: Vec<u8>,
service_centre_address: Vec<u8>,
sm_rp_pri: bool,
gprs_support_indicator: bool,
sm_rp_mti: Option<i64>,
) -> Self {
Self {
inner: RoutingInfoForSmArg {
msisdn: msisdn.into(),
sm_rp_pri,
service_centre_address: service_centre_address.into(),
gprs_support_indicator: gprs_support_indicator.then_some(()),
sm_rp_mti: sm_rp_mti.map(Into::into),
sm_rp_smea: None,
},
}
}
#[getter]
fn msisdn<'py>(&self, py: Python<'py>) -> Bound<'py, PyBytes> {
PyBytes::new(py, &self.inner.msisdn)
}
#[getter]
fn service_centre_address<'py>(&self, py: Python<'py>) -> Bound<'py, PyBytes> {
PyBytes::new(py, &self.inner.service_centre_address)
}
#[getter]
fn sm_rp_pri(&self) -> bool {
self.inner.sm_rp_pri
}
#[getter]
fn gprs_support_indicator(&self) -> bool {
self.inner.gprs_support_indicator.is_some()
}
#[getter]
fn op_code(&self) -> i64 {
op_codes::SEND_ROUTING_INFO_FOR_SM
}
fn encode(&self, py: Python<'_>) -> PyResult<Py<PyBytes>> {
ber_encode(&self.inner, py)
}
#[staticmethod]
fn decode(data: &[u8]) -> PyResult<Self> {
Ok(Self {
inner: ber_decode(data)?,
})
}
fn __repr__(&self) -> String {
format!(
"RoutingInfoForSmArg(msisdn={}, sc={}, sm_rp_pri={})",
hex::encode(&self.inner.msisdn),
hex::encode(&self.inner.service_centre_address),
self.inner.sm_rp_pri
)
}
}
#[pyclass(
name = "RoutingInfoForSmRes",
module = "gsm_map._gsm_map",
skip_from_py_object
)]
#[derive(Clone)]
pub struct PyRoutingInfoForSmRes {
inner: RoutingInfoForSmRes,
}
#[pymethods]
impl PyRoutingInfoForSmRes {
#[new]
fn new(imsi: Vec<u8>, location_info_with_lmsi: PyLocationInfoWithLmsi) -> Self {
Self {
inner: RoutingInfoForSmRes {
imsi: imsi.into(),
location_info_with_lmsi: location_info_with_lmsi.inner,
},
}
}
#[getter]
fn imsi<'py>(&self, py: Python<'py>) -> Bound<'py, PyBytes> {
PyBytes::new(py, &self.inner.imsi)
}
#[getter]
fn location_info_with_lmsi(&self) -> PyLocationInfoWithLmsi {
PyLocationInfoWithLmsi {
inner: self.inner.location_info_with_lmsi.clone(),
}
}
fn encode(&self, py: Python<'_>) -> PyResult<Py<PyBytes>> {
ber_encode(&self.inner, py)
}
#[staticmethod]
fn decode(data: &[u8]) -> PyResult<Self> {
Ok(Self {
inner: ber_decode(data)?,
})
}
fn __repr__(&self) -> String {
format!(
"RoutingInfoForSmRes(imsi={}, node={})",
hex::encode(&self.inner.imsi),
hex::encode(&self.inner.location_info_with_lmsi.network_node_number)
)
}
}
#[pyclass(
name = "MoForwardSmArg",
module = "gsm_map._gsm_map",
skip_from_py_object
)]
#[derive(Clone)]
pub struct PyMoForwardSmArg {
inner: MoForwardSmArg,
}
#[pymethods]
impl PyMoForwardSmArg {
#[new]
#[pyo3(signature = (sm_rp_da, sm_rp_oa, sm_rp_ui, *, imsi = None))]
fn new(
sm_rp_da: PySmRpDa,
sm_rp_oa: PySmRpOa,
sm_rp_ui: Vec<u8>,
imsi: Option<Vec<u8>>,
) -> Self {
Self {
inner: MoForwardSmArg {
sm_rp_da: sm_rp_da.inner,
sm_rp_oa: sm_rp_oa.inner,
sm_rp_ui: sm_rp_ui.into(),
imsi: imsi.map(Into::into),
},
}
}
#[getter]
fn sm_rp_da(&self) -> PySmRpDa {
PySmRpDa {
inner: self.inner.sm_rp_da.clone(),
}
}
#[getter]
fn sm_rp_oa(&self) -> PySmRpOa {
PySmRpOa {
inner: self.inner.sm_rp_oa.clone(),
}
}
#[getter]
fn sm_rp_ui<'py>(&self, py: Python<'py>) -> Bound<'py, PyBytes> {
PyBytes::new(py, &self.inner.sm_rp_ui)
}
#[getter]
fn op_code(&self) -> i64 {
op_codes::MO_FORWARD_SM
}
fn encode(&self, py: Python<'_>) -> PyResult<Py<PyBytes>> {
ber_encode(&self.inner, py)
}
#[staticmethod]
fn decode(data: &[u8]) -> PyResult<Self> {
Ok(Self {
inner: ber_decode(data)?,
})
}
fn __repr__(&self) -> String {
format!(
"MoForwardSmArg(da={}, oa={}, ui_len={})",
self.inner.sm_rp_da,
self.inner.sm_rp_oa,
self.inner.sm_rp_ui.len()
)
}
}
#[pyclass(
name = "MtForwardSmArg",
module = "gsm_map._gsm_map",
skip_from_py_object
)]
#[derive(Clone)]
pub struct PyMtForwardSmArg {
inner: MtForwardSmArg,
}
#[pymethods]
impl PyMtForwardSmArg {
#[new]
#[pyo3(signature = (sm_rp_da, sm_rp_oa, sm_rp_ui, *, more_messages_to_send = None))]
fn new(
sm_rp_da: PySmRpDa,
sm_rp_oa: PySmRpOa,
sm_rp_ui: Vec<u8>,
more_messages_to_send: Option<bool>,
) -> Self {
Self {
inner: MtForwardSmArg {
sm_rp_da: sm_rp_da.inner,
sm_rp_oa: sm_rp_oa.inner,
sm_rp_ui: sm_rp_ui.into(),
more_messages_to_send: more_messages_to_send.filter(|&b| b).map(|_| ()),
},
}
}
#[getter]
fn sm_rp_da(&self) -> PySmRpDa {
PySmRpDa {
inner: self.inner.sm_rp_da.clone(),
}
}
#[getter]
fn sm_rp_oa(&self) -> PySmRpOa {
PySmRpOa {
inner: self.inner.sm_rp_oa.clone(),
}
}
#[getter]
fn sm_rp_ui<'py>(&self, py: Python<'py>) -> Bound<'py, PyBytes> {
PyBytes::new(py, &self.inner.sm_rp_ui)
}
#[getter]
fn more_messages_to_send(&self) -> Option<bool> {
self.inner.more_messages_to_send.map(|()| true)
}
#[getter]
fn op_code(&self) -> i64 {
op_codes::MT_FORWARD_SM
}
fn encode(&self, py: Python<'_>) -> PyResult<Py<PyBytes>> {
ber_encode(&self.inner, py)
}
#[staticmethod]
fn decode(data: &[u8]) -> PyResult<Self> {
Ok(Self {
inner: ber_decode(data)?,
})
}
fn __repr__(&self) -> String {
format!(
"MtForwardSmArg(da={}, oa={}, ui_len={})",
self.inner.sm_rp_da,
self.inner.sm_rp_oa,
self.inner.sm_rp_ui.len()
)
}
}
#[pyclass(
name = "ReportSmDeliveryStatusArg",
module = "gsm_map._gsm_map",
skip_from_py_object
)]
#[derive(Clone)]
pub struct PyReportSmDeliveryStatusArg {
inner: ReportSmDeliveryStatusArg,
}
#[pymethods]
impl PyReportSmDeliveryStatusArg {
#[new]
fn new(msisdn: Vec<u8>, service_centre_address: Vec<u8>, outcome: i64) -> PyResult<Self> {
let sm_delivery_outcome = match outcome {
0 => SmDeliveryOutcome::MemoryCapacityExceeded,
1 => SmDeliveryOutcome::AbsentSubscriber,
2 => SmDeliveryOutcome::SuccessfulTransfer,
other => {
return Err(MapError::new_err(format!(
"invalid SM-DeliveryOutcome: {other} (expected 0, 1, or 2)"
)))
}
};
Ok(Self {
inner: ReportSmDeliveryStatusArg {
msisdn: msisdn.into(),
service_centre_address: service_centre_address.into(),
sm_delivery_outcome,
},
})
}
#[getter]
fn msisdn<'py>(&self, py: Python<'py>) -> Bound<'py, PyBytes> {
PyBytes::new(py, &self.inner.msisdn)
}
#[getter]
fn service_centre_address<'py>(&self, py: Python<'py>) -> Bound<'py, PyBytes> {
PyBytes::new(py, &self.inner.service_centre_address)
}
#[getter]
fn outcome(&self) -> i64 {
match self.inner.sm_delivery_outcome {
SmDeliveryOutcome::MemoryCapacityExceeded => 0,
SmDeliveryOutcome::AbsentSubscriber => 1,
SmDeliveryOutcome::SuccessfulTransfer => 2,
}
}
#[getter]
fn op_code(&self) -> i64 {
op_codes::REPORT_SM_DELIVERY_STATUS
}
fn encode(&self, py: Python<'_>) -> PyResult<Py<PyBytes>> {
ber_encode(&self.inner, py)
}
#[staticmethod]
fn decode(data: &[u8]) -> PyResult<Self> {
Ok(Self {
inner: ber_decode(data)?,
})
}
fn __repr__(&self) -> String {
format!(
"ReportSmDeliveryStatusArg(msisdn={}, outcome={})",
hex::encode(&self.inner.msisdn),
self.outcome()
)
}
}
#[pyclass(
name = "UpdateLocationArg",
module = "gsm_map._gsm_map",
skip_from_py_object
)]
#[derive(Clone)]
pub struct PyUpdateLocationArg {
inner: UpdateLocationArg,
}
#[pymethods]
impl PyUpdateLocationArg {
#[new]
#[pyo3(signature = (imsi, msc_number, vlr_number, *, lmsi = None))]
fn new(imsi: Vec<u8>, msc_number: Vec<u8>, vlr_number: Vec<u8>, lmsi: Option<Vec<u8>>) -> Self {
Self {
inner: UpdateLocationArg {
imsi: imsi.into(),
msc_number: msc_number.into(),
vlr_number: vlr_number.into(),
lmsi: lmsi.map(Into::into),
vlr_capability: None,
},
}
}
#[getter]
fn imsi<'py>(&self, py: Python<'py>) -> Bound<'py, PyBytes> {
PyBytes::new(py, &self.inner.imsi)
}
#[getter]
fn op_code(&self) -> i64 {
op_codes::UPDATE_LOCATION
}
fn encode(&self, py: Python<'_>) -> PyResult<Py<PyBytes>> {
ber_encode(&self.inner, py)
}
#[staticmethod]
fn decode(data: &[u8]) -> PyResult<Self> {
Ok(Self {
inner: ber_decode(data)?,
})
}
fn __repr__(&self) -> String {
format!("UpdateLocationArg(imsi={})", hex::encode(&self.inner.imsi))
}
}
#[pyclass(
name = "SendAuthenticationInfoArg",
module = "gsm_map._gsm_map",
skip_from_py_object
)]
#[derive(Clone)]
pub struct PySendAuthenticationInfoArg {
inner: SendAuthenticationInfoArg,
}
#[pymethods]
impl PySendAuthenticationInfoArg {
#[new]
#[pyo3(signature = (imsi, number_of_requested_vectors))]
fn new(imsi: Vec<u8>, number_of_requested_vectors: i64) -> Self {
Self {
inner: SendAuthenticationInfoArg {
imsi: imsi.into(),
number_of_requested_vectors: number_of_requested_vectors.into(),
re_synchronisation_info: None,
requesting_node_type: None,
},
}
}
#[getter]
fn imsi<'py>(&self, py: Python<'py>) -> Bound<'py, PyBytes> {
PyBytes::new(py, &self.inner.imsi)
}
#[getter]
fn op_code(&self) -> i64 {
op_codes::SEND_AUTHENTICATION_INFO
}
fn encode(&self, py: Python<'_>) -> PyResult<Py<PyBytes>> {
ber_encode(&self.inner, py)
}
#[staticmethod]
fn decode(data: &[u8]) -> PyResult<Self> {
Ok(Self {
inner: ber_decode(data)?,
})
}
fn __repr__(&self) -> String {
format!(
"SendAuthenticationInfoArg(imsi={})",
hex::encode(&self.inner.imsi)
)
}
}
#[pyfunction]
fn op_name(op_code: i64) -> &'static str {
operation_name(op_code)
}
fn add_contents(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add("MapError", m.py().get_type::<MapError>())?;
m.add_class::<PySmRpDa>()?;
m.add_class::<PySmRpOa>()?;
m.add_class::<PyLocationInfoWithLmsi>()?;
m.add_class::<PyRoutingInfoForSmArg>()?;
m.add_class::<PyRoutingInfoForSmRes>()?;
m.add_class::<PyMoForwardSmArg>()?;
m.add_class::<PyMtForwardSmArg>()?;
m.add_class::<PyReportSmDeliveryStatusArg>()?;
m.add_class::<PyUpdateLocationArg>()?;
m.add_class::<PySendAuthenticationInfoArg>()?;
m.add_function(wrap_pyfunction!(op_name, m)?)?;
m.add(
"OP_SEND_ROUTING_INFO_FOR_SM",
op_codes::SEND_ROUTING_INFO_FOR_SM,
)?;
m.add("OP_MO_FORWARD_SM", op_codes::MO_FORWARD_SM)?;
m.add("OP_MT_FORWARD_SM", op_codes::MT_FORWARD_SM)?;
m.add(
"OP_REPORT_SM_DELIVERY_STATUS",
op_codes::REPORT_SM_DELIVERY_STATUS,
)?;
m.add("OP_ALERT_SERVICE_CENTRE", op_codes::ALERT_SERVICE_CENTRE)?;
m.add("OP_INFORM_SERVICE_CENTRE", op_codes::INFORM_SERVICE_CENTRE)?;
m.add("OP_READY_FOR_SM", op_codes::READY_FOR_SM)?;
m.add("OP_UPDATE_LOCATION", op_codes::UPDATE_LOCATION)?;
m.add(
"OP_SEND_AUTHENTICATION_INFO",
op_codes::SEND_AUTHENTICATION_INFO,
)?;
m.add("DELIVERY_OUTCOME_MEMORY_CAPACITY_EXCEEDED", 0i64)?;
m.add("DELIVERY_OUTCOME_ABSENT_SUBSCRIBER", 1i64)?;
m.add("DELIVERY_OUTCOME_SUCCESSFUL_TRANSFER", 2i64)?;
Ok(())
}
#[pymodule]
fn _gsm_map(m: &Bound<'_, PyModule>) -> PyResult<()> {
add_contents(m)
}
pub fn register(py: Python<'_>, parent: &Bound<'_, PyModule>) -> PyResult<()> {
let m = PyModule::new(py, "gsm_map")?;
add_contents(&m)?;
parent.setattr("gsm_map", &m)?;
Ok(())
}