nrf_modem/
at_notifications.rs

1use crate::{
2    error::{Error, ErrorSource},
3    waker_node_list::{WakerNode, WakerNodeList},
4};
5use arrayvec::{ArrayString, ArrayVec};
6use core::{cell::RefCell, marker::PhantomPinned, task::Poll};
7use critical_section::Mutex;
8
9static WAKER_NODE_LIST: Mutex<RefCell<WakerNodeList<dyn NotificationBuffer>>> =
10    Mutex::new(RefCell::new(WakerNodeList::new()));
11
12pub(crate) unsafe extern "C" fn at_notification_handler(notif: *const core::ffi::c_char) {
13    #[cfg(feature = "defmt")]
14    defmt::trace!(
15        "AT notification <- {}",
16        core::ffi::CStr::from_ptr(notif as *const _)
17            .to_str()
18            .unwrap()
19    );
20
21    critical_section::with(|cs| {
22        WAKER_NODE_LIST
23            .borrow_ref_mut(cs)
24            .wake_all(|c| c.write(notif.cast()))
25    });
26}
27
28pub(crate) fn initialize() -> Result<(), Error> {
29    unsafe {
30        nrfxlib_sys::nrf_modem_at_notif_handler_set(Some(at_notification_handler)).into_result()?;
31    }
32
33    Ok(())
34}
35
36/// An async stream of all AT notifications.
37///
38/// Implements the [futures::Stream] trait for polling.
39pub struct AtNotificationStream<const CAP: usize, const COUNT: usize> {
40    buffer: ArrayVec<ArrayString<CAP>, COUNT>,
41    waker_node: Option<WakerNode<dyn NotificationBuffer>>,
42    _phantom: PhantomPinned,
43}
44
45impl<const CAP: usize, const COUNT: usize> AtNotificationStream<CAP, COUNT> {
46    /// Creates a new stream that receives AT notifications.
47    ///
48    /// To access all the nice iterator functions, use [futures::StreamExt].
49    pub async fn new() -> Self {
50        Self {
51            buffer: Default::default(),
52            waker_node: None,
53            _phantom: Default::default(),
54        }
55    }
56
57    /// Futures are lazy and can only register themselves once polled.
58    /// Call this function if you want to register this stream early so that it can already receive notifications.
59    pub async fn register(self: core::pin::Pin<&mut Self>) {
60        let this = unsafe { self.get_unchecked_mut() };
61
62        core::future::poll_fn(|cx| {
63            // Initialize the waker node
64            let buffer_ptr = &mut this.buffer as *mut dyn NotificationBuffer;
65            let waker_node = this
66                .waker_node
67                .get_or_insert_with(|| WakerNode::new(Some(buffer_ptr), cx.waker().clone()));
68
69            critical_section::with(|cs| unsafe {
70                WAKER_NODE_LIST
71                    .borrow_ref_mut(cs)
72                    .append_node(waker_node as *mut _)
73            });
74
75            Poll::Ready(())
76        })
77        .await;
78    }
79}
80
81impl<const CAP: usize, const COUNT: usize> futures::Stream for AtNotificationStream<CAP, COUNT> {
82    type Item = ArrayString<CAP>;
83
84    fn poll_next(
85        self: core::pin::Pin<&mut Self>,
86        cx: &mut core::task::Context<'_>,
87    ) -> Poll<Option<Self::Item>> {
88        let this = unsafe { self.get_unchecked_mut() };
89
90        if this.waker_node.is_none() {
91            // Initialize the waker node
92            let buffer_ptr = &mut this.buffer as *mut dyn NotificationBuffer;
93            let waker_node = this
94                .waker_node
95                .get_or_insert_with(|| WakerNode::new(Some(buffer_ptr), cx.waker().clone()));
96
97            critical_section::with(|cs| unsafe {
98                WAKER_NODE_LIST
99                    .borrow_ref_mut(cs)
100                    .append_node(waker_node as *mut _)
101            });
102        }
103
104        critical_section::with(|_| {
105            if !this.buffer.is_empty() {
106                Poll::Ready(Some(this.buffer.remove(0)))
107            } else {
108                Poll::Pending
109            }
110        })
111    }
112}
113
114impl<const CAP: usize, const COUNT: usize> Drop for AtNotificationStream<CAP, COUNT> {
115    fn drop(&mut self) {
116        if let Some(waker_node) = self.waker_node.as_mut() {
117            critical_section::with(|cs| unsafe {
118                WAKER_NODE_LIST
119                    .borrow_ref_mut(cs)
120                    .remove_node(waker_node as *mut _)
121            });
122        }
123    }
124}
125
126trait NotificationBuffer {
127    fn write(&mut self, notif: *const u8);
128}
129
130impl<const CAP: usize, const COUNT: usize> NotificationBuffer
131    for ArrayVec<ArrayString<CAP>, COUNT>
132{
133    fn write(&mut self, mut notif: *const u8) {
134        if self.is_full() {
135            #[cfg(feature = "defmt")]
136            defmt::warn!("Notification buffer is full");
137
138            return;
139        }
140
141        let mut string = ArrayString::new();
142
143        while !string.is_full() && unsafe { *notif != 0 } {
144            let c = unsafe { char::from_u32_unchecked(((*notif) & 0x7F) as u32) };
145            string.push(c);
146
147            notif = unsafe { notif.add(1) };
148        }
149
150        #[cfg(feature = "defmt")]
151        if string.is_full() && unsafe { *notif != 0 } {
152            let mut missing_chars = 0;
153
154            while unsafe { *notif != 0 } {
155                notif = unsafe { notif.add(1) };
156                missing_chars += 1;
157            }
158
159            defmt::warn!(
160                "AT notification got truncated. Missing {} chars",
161                missing_chars
162            );
163        }
164
165        self.push(string);
166    }
167}