1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
use crate::{
    error::{Error, ErrorSource},
    waker_node_list::{WakerNode, WakerNodeList},
};
use arrayvec::{ArrayString, ArrayVec};
use core::{cell::RefCell, marker::PhantomPinned, task::Poll};
use critical_section::Mutex;

static WAKER_NODE_LIST: Mutex<RefCell<WakerNodeList<dyn NotificationBuffer>>> =
    Mutex::new(RefCell::new(WakerNodeList::new()));

pub(crate) unsafe extern "C" fn at_notification_handler(notif: *const core::ffi::c_char) {
    let notif = notif as *const u8;

    #[cfg(feature = "defmt")]
    defmt::trace!(
        "AT notification <- {}",
        core::ffi::CStr::from_ptr(notif as *const _)
            .to_str()
            .unwrap()
    );

    critical_section::with(|cs| {
        WAKER_NODE_LIST
            .borrow_ref_mut(cs)
            .wake_all(|c| c.write(notif))
    });
}

pub(crate) fn initialize() -> Result<(), Error> {
    unsafe {
        nrfxlib_sys::nrf_modem_at_notif_handler_set(Some(at_notification_handler)).into_result()?;
    }

    Ok(())
}

/// An async stream of all AT notifications.
///
/// Implements the [futures::Stream] trait for polling.
pub struct AtNotificationStream<const CAP: usize, const COUNT: usize> {
    buffer: ArrayVec<ArrayString<CAP>, COUNT>,
    waker_node: Option<WakerNode<dyn NotificationBuffer>>,
    _phantom: PhantomPinned,
}

impl<const CAP: usize, const COUNT: usize> AtNotificationStream<CAP, COUNT> {
    /// Creates a new stream that receives AT notifications.
    ///
    /// To access all the nice iterator functions, use [futures::StreamExt].
    pub async fn new() -> Self {
        Self {
            buffer: Default::default(),
            waker_node: None,
            _phantom: Default::default(),
        }
    }

    /// Futures are lazy and can only register themselves once polled.
    /// Call this function if you want to register this stream early so that it can already receive notifications.
    pub async fn register(self: core::pin::Pin<&mut Self>) {
        let this = unsafe { self.get_unchecked_mut() };

        core::future::poll_fn(|cx| {
            // Initialize the waker node
            let buffer_ptr = &mut this.buffer as *mut dyn NotificationBuffer;
            let waker_node = this
                .waker_node
                .get_or_insert_with(|| WakerNode::new(Some(buffer_ptr), cx.waker().clone()));

            critical_section::with(|cs| unsafe {
                WAKER_NODE_LIST
                    .borrow_ref_mut(cs)
                    .append_node(waker_node as *mut _)
            });

            Poll::Ready(())
        })
        .await;
    }
}

impl<const CAP: usize, const COUNT: usize> futures::Stream for AtNotificationStream<CAP, COUNT> {
    type Item = ArrayString<CAP>;

    fn poll_next(
        self: core::pin::Pin<&mut Self>,
        cx: &mut core::task::Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        let this = unsafe { self.get_unchecked_mut() };

        if this.waker_node.is_none() {
            // Initialize the waker node
            let buffer_ptr = &mut this.buffer as *mut dyn NotificationBuffer;
            let waker_node = this
                .waker_node
                .get_or_insert_with(|| WakerNode::new(Some(buffer_ptr), cx.waker().clone()));

            critical_section::with(|cs| unsafe {
                WAKER_NODE_LIST
                    .borrow_ref_mut(cs)
                    .append_node(waker_node as *mut _)
            });
        }

        critical_section::with(|_| {
            if !this.buffer.is_empty() {
                Poll::Ready(Some(this.buffer.remove(0)))
            } else {
                Poll::Pending
            }
        })
    }
}

impl<const CAP: usize, const COUNT: usize> Drop for AtNotificationStream<CAP, COUNT> {
    fn drop(&mut self) {
        if let Some(waker_node) = self.waker_node.as_mut() {
            critical_section::with(|cs| unsafe {
                WAKER_NODE_LIST
                    .borrow_ref_mut(cs)
                    .remove_node(waker_node as *mut _)
            });
        }
    }
}

trait NotificationBuffer {
    fn write(&mut self, notif: *const u8);
}

impl<const CAP: usize, const COUNT: usize> NotificationBuffer
    for ArrayVec<ArrayString<CAP>, COUNT>
{
    fn write(&mut self, mut notif: *const u8) {
        if self.is_full() {
            #[cfg(feature = "defmt")]
            defmt::warn!("Notification buffer is full");

            return;
        }

        let mut string = ArrayString::new();

        while !string.is_full() && unsafe { *notif != 0 } {
            let c = unsafe { char::from_u32_unchecked(((*notif) & 0x7F) as u32) };
            string.push(c);

            notif = unsafe { notif.add(1) };
        }

        #[cfg(feature = "defmt")]
        if string.is_full() && unsafe { *notif != 0 } {
            let mut missing_chars = 0;

            while unsafe { *notif != 0 } {
                notif = unsafe { notif.add(1) };
                missing_chars += 1;
            }

            defmt::warn!(
                "AT notification got truncated. Missing {} chars",
                missing_chars
            );
        }

        self.push(string);
    }
}