ads_client/
lib.rs

1//! Welcome to the ADS client library.
2//! 
3//! This create enables communication over the [Beckhoff ADS](https://infosys.beckhoff.com/content/1033/tcinfosys3/11291871243.html) protocoll.
4//! 
5//! The ADS client is used to work beside a 
6//! [TC1000 ADS router](https://www.beckhoff.com/en-en/products/automation/twincat/tc1xxx-twincat-3-base/tc1000.html)
7//! which is part of every TwinCAT installation. The client requires at least TwinCAT Version 3.1.4024.x.
8//! 
9//! This crate grants access to the following ADS commands:
10//! 
11//! - [Client::read_state]
12//! - [Client::read]
13//! - [Client::write]
14//! - [Client::read_write]
15//! - [Client::write_control]
16//! - [Client::add_device_notification]
17//! - [Client::delete_device_notification]
18//! - [Client::read_device_info]
19//! 
20//! The methods are implemented asynchronous and non-blocking based on the [tokio](https://tokio.rs/) runtime.
21//! 
22//! # Usage
23//! 
24//! Checkout the [example section](https://github.com/hANSIc99/ads_client/tree/main/examples) in the repsoitory.
25
26#![allow(unused)]
27
28#[macro_use]
29mod misc;
30mod command_manager;
31mod command_cleaner;
32mod ads_read;
33mod ads_write;
34mod ads_read_state;
35mod ads_read_write;
36mod ads_add_device_notification;
37mod ads_delete_device_notification;
38mod ads_write_control;
39mod ads_read_device_info;
40
41use std::time::{Instant, Duration};
42use std::io;
43use std::net::SocketAddr;
44use std::mem::size_of_val;
45use std::sync::{Arc, Mutex, atomic::{AtomicU16, Ordering}};
46use tokio::net::TcpStream;
47use tokio::{runtime, stream};
48use tokio::io::{ReadHalf, WriteHalf};
49use tokio::io::{AsyncWriteExt, AsyncReadExt};
50use tokio::time::sleep;
51use log::{trace, debug, info, warn, error};
52use bytes::{Bytes, BytesMut};
53
54use command_cleaner::CommandCleaner;
55use command_manager::CommandManager;
56
57use misc::{AdsCommand, Handle, HandleData, NotHandle, AmsNetId, AdsStampHeader, AdsNotificationSample};
58pub use misc::{AdsTimeout, AdsNotificationAttrib, AdsTransMode, StateInfo, DeviceStateInfo, AdsState, Notification, Result, AdsError, AdsErrorCode}; // Re-export type
59
60
61/// Size of the AMS/TCP + ADS headers
62// https://infosys.beckhoff.com/content/1033/tc3_ads_intro/115845259.html?id=6032227753916597086
63const HEADER_SIZE           : usize = 38;
64const AMS_HEADER_SIZE       : usize = HEADER_SIZE - 6; // without leading nulls and length
65const LEN_READ_REQ          : usize = 12;
66const LEN_RW_REQ_MIN        : usize = 16;
67const LEN_W_REQ_MIN         : usize = 12;
68const LEN_ADD_DEV_NOT       : usize = 38;
69const LEN_STAMP_HEADER_MIN  : usize = 12;   // Time Stamp [8] + No Samples [4]
70const LEN_NOT_SAMPLE_MIN    : usize = 8;    // Notification Handle [4] + Sample Size [4]
71const LEN_DEL_DEV_NOT       : usize = 4;
72const LEN_WR_CTRL_MIN       : usize = 8;
73
74enum ProcessStateMachine{
75    ReadHeader,
76    ReadPayload { len_payload: usize, err_code: u32, invoke_id: u32, cmd: AdsCommand}
77}
78
79#[derive(Debug)]
80pub struct ClientBuilder<'a> {
81    addr: &'a str,
82    port: u16,
83    timeout: AdsTimeout,
84    retry_delay: Option<Duration>,
85}
86
87impl<'a> ClientBuilder<'a> {
88    pub fn new(addr: &'a str, port: u16) -> Self {
89        Self { addr, port, timeout: AdsTimeout::DefaultTimeout, retry_delay: None }
90    }
91
92    pub fn set_timeout(mut self, timeout: AdsTimeout) -> Self {
93        self.timeout = timeout;
94        self
95    }
96
97    pub fn set_retry_delay(mut self, retry_delay: Option<Duration>) -> Self {
98        self.retry_delay = retry_delay;
99        self
100    }
101
102    pub async fn build(self) -> Result<Client> {
103        Client::new(self.addr, self.port, self.timeout, self.retry_delay).await
104    }
105}
106
107/// An ADS client to use in combination with the [TC1000 ADS router](https://www.beckhoff.com/en-en/products/automation/twincat/tc1xxx-twincat-3-base/tc1000.html).
108/// 
109/// The client opens a port on the local ADS router in order to submit ADS requests.
110/// Use the [Client::new] method to create an instance.
111#[derive(Debug)]
112pub struct Client {
113    _dst_addr       : AmsNetId,
114    _dst_port       : u16,
115    _src_addr       : AmsNetId,
116    _src_port       : u16,
117    timeout         : u64, // ADS Timeout [s]
118    socket_wrt      : Arc<Mutex<WriteHalf<TcpStream>>>,
119    handles         : Arc<Mutex<Vec<Handle>>>, // Internal stack of Handles (^=ADS CommandsInvoke) for decoupling requests and responses
120    not_handles     : Arc<Mutex<Vec<NotHandle>>>,
121    ams_header      : [u8; HEADER_SIZE],
122    hdl_cnt         : Arc<AtomicU16>
123}
124
125// TODO: Implement Defaul trait
126// https://doc.rust-lang.org/std/default/trait.Default.html
127
128
129impl Client {
130   
131    async fn connect(answer: &mut [u8]) -> Result<TcpStream> {
132        let stream  = TcpStream::connect(&SocketAddr::from(([127, 0, 0, 1], 48898))).await.map_err::<AdsError, _>(|err| err.into() )?;
133        let handshake : [u8; 8] = [0x00, 0x10, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00 ];
134
135        // WRITING
136        loop {
137            // Wait for the socket to be writable
138            stream.writable().await.map_err::<AdsError, _>(|err| err.into() )?;
139    
140            // Try to write data, this may still fail with `WouldBlock`
141            // if the readiness event is a false positive.
142            match stream.try_write(&handshake) {
143                Ok(_) => {
144                    break;
145                }
146                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
147                    warn!("TcpStream: false positive reaction / stream was not yet ready for reading {:?}", e);
148                    continue;
149                }
150                Err(e) => {
151                    error!("Failed to write to socket");
152                    return Err(e.into());
153                }
154            }
155        }
156
157        // READING
158        loop {
159            // Wait for the socket to be readable
160            stream.readable().await?;
161    
162            // Try to read data, this may still fail with `WouldBlock`
163            // if the readiness event is a false positive.
164            match stream.try_read(answer) {
165                Ok(0) => break,
166                Ok(n) => {
167                    if n == 14 {
168                        info!("Connection to AMS router established");
169                        break;
170                    } else {
171                        error!("Router port disabled – TwinCAT system service not started.");
172                        return Err(AdsError{n_error : 18, s_msg : String::from("Port disabled – TwinCAT system service not started.")});
173                    }
174                }
175                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
176                    warn!("TcpStream: false positive reaction / stream was not yet ready for writing {:?}", e);
177                    continue;
178                }
179                Err(_) => {
180                    error!("Router port disabled – TwinCAT system service not started.");
181                    return Err(AdsError{n_error : 18, s_msg : String::from("Port disabled – TwinCAT system service not started.")});
182                }
183            }
184        }
185
186        Ok(stream)
187    } 
188
189    async fn process_response(handles: Arc<Mutex<Vec<Handle>>>, not_handles: Arc<Mutex<Vec<NotHandle>>>, mut rd_stream : ReadHalf<TcpStream>, retry_delay: Option<Duration>) {
190        
191        let mut state = ProcessStateMachine::ReadHeader;
192        let rt = runtime::Handle::current();
193        
194        loop {
195            match &mut state {
196
197                ProcessStateMachine::ReadHeader => {
198
199                    let mut header_buf : [u8; HEADER_SIZE] = [0; HEADER_SIZE];
200
201                    match rd_stream.read(&mut header_buf).await {
202                        Ok(0) => {
203                           warn!("[0] Incoming ADS response - no bytes to read");
204                        }
205                        Ok(_) => {
206                            let len_payload = Client::extract_length(&header_buf).unwrap_or_default();
207                            let err_code = Client::extract_error_code(&header_buf).unwrap_or_default();
208                            let invoke_id   = Client::extract_invoke_id(&header_buf).unwrap_or_default();
209                            let ads_cmd     = Client::extract_cmd_tyte(&header_buf).unwrap_or_default();
210
211                            if(len_payload == 0){
212                                warn!("Invoke id {}: No ADS payload available - skip", invoke_id);
213                                continue;
214                            }
215
216                            trace!("[0] Incoming ADS response with {:?} byte payload", len_payload);
217
218                            state = ProcessStateMachine::ReadPayload{
219                                len_payload : len_payload,
220                                err_code    : err_code,
221                                invoke_id   : invoke_id,
222                                cmd         : ads_cmd
223                            };
224
225                        }
226                        Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
227                            warn!("TcpStream: false positive reaction / stream was not yet ready for reading{:?}", e);
228                            continue;
229                        }
230                        Err(e) => {
231                            error!("Socket Error (0x1): {:?}", e);
232                            //panic!("Socket Error (0x1): {:?}", e);
233                            if let Some(ref delay) = retry_delay {
234                                sleep(*delay).await;
235                            }
236                        }
237                    }
238                }
239                
240                ProcessStateMachine::ReadPayload {len_payload, err_code, invoke_id, cmd} => {
241                    
242                    let mut payload = BytesMut::with_capacity(*len_payload);
243
244                    match rd_stream.read_buf(&mut payload).await {
245                        Ok(0) => {
246                            info!("[1] ADS response {:?}, Invoke ID: {:?}: - zero payload", cmd, invoke_id);
247                            state = ProcessStateMachine::ReadHeader;
248                        }
249                        Ok(_) => {
250                            
251                            let buf = payload.freeze(); // Convert to Bytes
252                            match cmd {
253                                AdsCommand::DeviceNotification => {
254                                    trace!("[1] Processing device notification");
255                                    let _not_handles = Arc::clone(&not_handles); 
256                                    rt.spawn(Client::process_device_notification(_not_handles, buf));
257
258                                },
259                                _ => {
260                                    trace!("[1] Processing ADS response");
261                                    let _handles = Arc::clone(&handles);
262                                    rt.spawn(Client::process_command(*err_code, *invoke_id, _handles, buf));
263                                }
264
265                            };
266
267                            state = ProcessStateMachine::ReadHeader;
268                        }
269                        Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
270                            warn!("ADS command {:?}, Invoke ID: {:?}: - WouldBlock error during reading occured", cmd, invoke_id);
271                            continue;
272                        }
273                        Err(e) => {
274                            error!("ADS command {:?}, Invoke ID: {:?}: - Error occurred: {:?}", cmd, invoke_id, e);
275                            //panic!("Socket Error (0x1): {:?}", e);
276                            if let Some(ref delay) = retry_delay {
277                                sleep(*delay).await;
278                            }
279                        }
280                    } // match
281                }
282            } // match
283        } // loop
284    } // fn
285
286    async fn socket_write(&self, data: &[u8] ) -> Result<()> {
287
288                let a_wrt_stream = Arc::clone(&self.socket_wrt);
289                {
290                    let mut wrt_stream = a_wrt_stream.lock();
291
292                    match wrt_stream {
293                        Ok(ref mut stream) => {
294                            stream.write(data).await?;
295                        },
296                        Err(_) => {
297                            return Err( AdsError { n_error : 10, s_msg : String::from("Writing to Tcp Stream socket failed") } );
298                        }
299                    }
300                }
301                //Err(Box::new(AdsError{ n_error : 1792 })) // DEBUG
302                Ok(())          
303    }
304    
305    /// Create a new instance of an ADS client.
306    /// 
307    /// - `addr` AmsNetId of the target system
308    /// - `port` ADS port number to communicate with
309    /// - `timeout` Value for ADS timeout value ([AdsTimeout::DefaultTimeout] corresponds to 5s)
310    /// 
311    /// # Example
312    /// ```rust
313    /// use ads_client::{ClientBuilder, Result};
314    /// #[tokio::main]
315    /// async fn main() -> Result<()> {
316    ///     let ads_client =  ClientBuilder::new("5.80.201.232.1.1", 851).build().await?;
317    ///     Ok(())
318    /// }
319    /// ```
320    async fn new(addr : &str, port : u16, timeout : AdsTimeout, retry_delay: Option<Duration>) -> Result<Self> {
321        let state_flag : u16 = 4;
322        let error_code : u32 = 0;
323        let mut b_vec = Vec::<u8>::new();
324
325        // BAUSTELLE // Pass ADS Address
326        for s_byte in addr.split('.') {
327            // https://doc.rust-lang.org/rust-by-example/error/multiple_error_types/reenter_question_mark.html
328            let n_byte = s_byte.parse::<u8>()?;
329            b_vec.push(n_byte);
330        }
331
332        let timeout = match timeout {
333            AdsTimeout::DefaultTimeout => 5,
334            AdsTimeout::CustomTimeout(time) => time
335        };
336
337        let hdl_rt = runtime::Handle::current();
338
339        let mut answer : [u8; 14] = [0; 14];
340
341        let _stream = Client::connect(&mut answer).await?;
342        info!("ADS client port opened: {}", u16::from_ne_bytes(answer[12..14].try_into().unwrap_or_default()));
343
344        // Split the stream into a read and write part
345        //
346        // Read-half goes to process_response()
347        // Write-half goes to Self
348
349        let (read, write) = tokio::io::split(_stream);
350
351        let a_socket_wrt = Arc::new(Mutex::new(write));
352
353        // Create atomic instances of the handle vector
354        let a_handles = Arc::new(Mutex::new( Vec::<Handle>::new() ));
355        let a_not_handles =  Arc::new(Mutex::new( Vec::<NotHandle>::new() ));
356
357        // Process incoming ADS responses
358        let response_vector_a  = Arc::clone(&a_handles);
359        let not_response_vector_a = Arc::clone(&a_not_handles);
360        hdl_rt.spawn(Client::process_response(response_vector_a, not_response_vector_a, read, retry_delay));
361
362        // Instantiate and spawn the CommandCleanter
363        let response_vector_b = Arc::clone(&a_handles);
364        hdl_rt.spawn(CommandCleaner::new(1, timeout, response_vector_b));
365
366        Ok(Self {
367            _dst_addr    : b_vec.clone().try_into().expect("AmsNetId consist of exact 6 bytes"), // https://stackoverflow.com/questions/25428920/how-to-get-a-slice-as-an-array-in-rust
368            _dst_port    : port,
369            _src_addr    : [answer[6], answer[7], answer[8], answer[9], answer[10], answer[11]],
370            _src_port    : u16::from_ne_bytes(answer[12..14].try_into().expect("Parsing source port failed")),
371            timeout      : timeout,
372            socket_wrt   : a_socket_wrt,
373            handles      : a_handles,
374            not_handles  : a_not_handles,
375            ams_header      : [
376                0, // Reserved
377                0,
378                0, // Header size + playload
379                0,
380                0,
381                0,
382                b_vec[0], // Target NetId
383                b_vec[1],
384                b_vec[2],
385                b_vec[3],
386                b_vec[4],
387                b_vec[5],
388                u16_low_byte!(port), // Target port
389                u16_high_byte!(port), 
390                answer[6], //  Source NetId
391                answer[7],
392                answer[8],
393                answer[9],
394                answer[10],
395                answer[11],
396                answer[12], // Source Port
397                answer[13], 
398                0, // Command-Id
399                0, 
400                u16_low_byte!(state_flag), // State flags
401                u16_high_byte!(state_flag), 
402                0, // Length
403                0,
404                0,
405                0, 
406                u32_lw_lb!(error_code), // Error code
407                u32_lw_hb!(error_code),
408                u32_hw_lb!(error_code),
409                u32_hw_hb!(error_code), 
410                0, // Invoke Id
411                0,
412                0,
413                0
414            ],
415            hdl_cnt         : Arc::new(AtomicU16::new(1))
416        })
417    }
418
419    fn register_command_handle(&self, invoke_id : u32, cmd : AdsCommand){
420        let a_handles = Arc::clone(&self.handles);
421
422        let rs_req_hdl = Handle {
423            cmd_type  : cmd,
424            invoke_id : invoke_id,
425            data      : HandleData::default(),
426            timestamp : Instant::now(),
427        };
428    
429        {
430            let mut handles = a_handles.lock().expect("Threading Error");
431            handles.push(rs_req_hdl);
432        }
433    }
434
435    fn register_not_handle(&self, not_hdl: u32, callback: Notification, user_data: Option<&Arc<Mutex<BytesMut>>>) {
436        let a_not_handles = Arc::clone(&self.not_handles);
437
438        let not_hdl = NotHandle {
439            callback  : callback,
440            not_hdl   : not_hdl,
441            user_data : user_data.and_then(|arc_bytes| Some(Arc::clone(arc_bytes)) )
442        };
443
444        {
445            let mut not_handles = a_not_handles.lock().expect("Threading Error");
446            not_handles.push(not_hdl);
447        }
448    }
449
450    fn create_cmd_man_future(&self, invoke_id: u32) -> CommandManager {
451        let a_handles = Arc::clone(&self.handles);
452        CommandManager::new(self.timeout, invoke_id, a_handles)
453    }
454
455    fn create_invoke_id(&self) -> u32 {
456        u32::from(self.hdl_cnt.fetch_add(1, Ordering::SeqCst))
457    }
458
459    fn c_init_ams_header(&self, invoke_id : u32, length_payload : Option<u32>, cmd : AdsCommand) -> [u8; HEADER_SIZE] {
460        let length_payload = length_payload.unwrap_or(0);
461        let length_header : u32 = AMS_HEADER_SIZE as u32 + length_payload;
462
463        let mut ams_header : [u8; HEADER_SIZE] = self.ams_header;
464        // length header + payload
465        ams_header[2..6].copy_from_slice(&length_header.to_ne_bytes());
466        // command id
467        ams_header[22..24].copy_from_slice(&(cmd as u16).to_ne_bytes());
468        // length payload
469        ams_header[26..30].copy_from_slice(&length_payload.to_ne_bytes());
470        // invoke Id
471        ams_header[34..38].copy_from_slice(&invoke_id.to_ne_bytes());
472
473        ams_header
474    }
475
476    fn eval_return_code(answer: &[u8]) -> Result<u32> {
477        let ret_code = u32::from_ne_bytes(answer[0..4].try_into()?);
478
479        if ret_code != 0 {
480            Err(AdsError{ n_error : ret_code, s_msg : String::from("Errorcode of ADS response") }) // TODO Add text to error codes
481        } else {
482            Ok(ret_code)
483        }
484    }
485
486    fn eval_ams_error(ams_err : u32) -> Result<()> {
487        if ams_err != 0 {
488            return Err(AdsError{n_error : ams_err, s_msg : String::from("Errorcode of ADS response") });
489        }
490        Ok(())
491    }
492
493    fn extract_error_code(answer: &[u8]) -> Result<u32> {
494        Ok(u32::from_ne_bytes(answer[HEADER_SIZE-8..HEADER_SIZE-4].try_into()?))
495    }
496
497    fn extract_invoke_id(answer: &[u8]) -> Result<u32> {
498        Ok(u32::from_ne_bytes(answer[HEADER_SIZE-4..HEADER_SIZE].try_into()?))
499    }
500
501    fn extract_cmd_tyte(answer: &[u8]) -> Result<AdsCommand>{
502        u16::from_ne_bytes(answer[HEADER_SIZE-16..HEADER_SIZE-14].try_into()?).try_into()
503    }
504
505    fn extract_length(answer: &[u8]) -> Result<usize>{
506        // length in AMS-Header https://infosys.beckhoff.com/content/1031/tc3_ads_intro/115847307.html
507        let tmp = u32::from_ne_bytes(answer[HEADER_SIZE-12..HEADER_SIZE-8].try_into()?);
508        //Err(AdsError{s_msg: String::from("test"),  n_error : 1212}) // DEBUG
509        Ok(usize::try_from(tmp)?)
510    }
511
512    fn not_extract_length(answer: &[u8]) -> Result<usize>{
513        let tmp = u32::from_ne_bytes(answer[0..4].try_into()?);
514        Ok(usize::try_from(tmp)?)
515    }
516
517    /// Panics if the input slice is less than 8 bytes
518    fn not_extract_stamps(answer: &[u8]) -> Result<u32>{
519        Ok(u32::from_ne_bytes(answer[4..8].try_into()?))
520    }
521
522    async fn process_command(err_code: u32, invoke_id: u32, cmd_register: Arc<Mutex<Vec<Handle>>>, data: Bytes){
523        trace!("[2] AdsCmd: Invoke ID: {}", invoke_id);
524
525        match cmd_register.lock() {
526            Ok(mut h) => {
527
528                if let Some(hdl) =  h.iter_mut().find( | hdl | hdl.invoke_id == invoke_id) {
529                    hdl.data.payload = Some(data);
530                    hdl.data.ams_err = err_code;
531                } else {
532                    warn!("No corresponding invoke ID found in CMD register - response will expire");
533                }
534
535            },
536            Err(e) => {
537                error!("Failed to lock command register - response dropped");
538                return;
539            }
540        };
541    }
542
543    async fn process_device_notification(not_register: Arc<Mutex<Vec<NotHandle>>>, data: Bytes){
544        trace!("[2] Start processing AdsDeviceNotification");
545        let stream_length = match Client::not_extract_length(&data){
546            Ok(size) => size,
547            Err(e) => {
548                error!("Failed to extract notification length - Notification dropped - {:?}", e);
549                return;
550            }
551        };
552
553        let no_stamps = match Client::not_extract_stamps(&data){
554            Ok(stamps) => stamps,
555            Err(e) => {
556                error!("Failed to extract number of stamps- Notification dropped - {:?}", e);
557                return;
558            }
559        };
560
561        let rt          = runtime::Handle::current();
562        // Maximum stamp_header_offset == stream_size - sizeof(stamps)
563        // ^= stream_size - 4
564        
565        // Calculate the last byte index of the AdsNotificaionStream (Length, Samples + AdsStampHeader)
566        let max_stamp_header_offset = stream_length + size_of_val(&no_stamps); 
567        let mut stamp_header_offset : usize = 8; // Start index of AdsNotificationStream
568
569
570        for _ in 0..no_stamps { // Iterate over AdsStampHeader 
571            // Return if there is no data beside of the AdsStampHeader consisting of time stamp [8] and no samples [4]
572            if (stamp_header_offset + LEN_STAMP_HEADER_MIN) > max_stamp_header_offset {
573                info!("Received Device Notification without sample data");
574                continue;
575            }
576           
577            
578            let stamp_header = AdsStampHeader {
579                timestamp : u64::from_ne_bytes(data[stamp_header_offset.. stamp_header_offset + 8]
580                                                .try_into()
581                                                .unwrap_or_default()),
582
583                samples : u32::from_ne_bytes(data[stamp_header_offset + 8..stamp_header_offset + 12]
584                                                .try_into()
585                                                .unwrap_or_default())
586            };
587
588            if (stamp_header == AdsStampHeader::default()){
589                info!("Empty AdsStampHeader - Continue with next stamp");
590                continue;
591            }
592
593            // Increase stamp header offset, move it to first AdsNotificaionSample (+= 12 byte)
594            stamp_header_offset += LEN_STAMP_HEADER_MIN;
595            // == 20 (after first call)
596
597            for _ in 0..stamp_header.samples {
598                // Return if there is not enough data
599                if (stamp_header_offset + LEN_NOT_SAMPLE_MIN) > max_stamp_header_offset {
600                    info!("[A] Not enough data in available in stream");
601                    return;
602                }
603
604                let not_sample = AdsNotificationSample {
605                    not_hdl : u32::from_ne_bytes(data[stamp_header_offset..stamp_header_offset + 4]
606                                                        .try_into()
607                                                        .unwrap_or_default()),
608
609                    sample_size : u32::from_ne_bytes(data[stamp_header_offset + 4 ..stamp_header_offset + 8]
610                                                        .try_into()
611                                                        .unwrap_or_default())
612                };
613
614                if (not_sample == AdsNotificationSample::default()){
615                    info!("No data in AdsNotificationSample - skip");
616                    continue;
617                }
618
619                stamp_header_offset += LEN_NOT_SAMPLE_MIN;
620
621                if (stamp_header_offset + not_sample.sample_size as usize) > max_stamp_header_offset {
622                    info!("[B] Not enough data in available in stream");
623                    return;
624                }
625
626                let mut _cb_and_data : Option<(Notification, Option<Arc<Mutex<BytesMut>>>)> = None;
627                
628                // The callback must be called after the lock. 
629                // If it is called during the lock, it could block the access to the notification handles infinitely.
630
631                { // LOCK
632                    let mut _not_handles = not_register.lock().expect("Threading Error");
633                    let mut _iter = _not_handles.iter_mut();
634                    
635                    _cb_and_data = _iter.find( | hdl | hdl.not_hdl  == not_sample.not_hdl)
636                            .and_then(| hdl : &mut NotHandle | Some( (hdl.callback, hdl.user_data.clone()) ) ); // Return callback and user data
637                } // UNLOCK
638                
639                
640                _cb_and_data.and_then(|(callback, user_data)| {
641                    let payload = Bytes::from(data.slice(stamp_header_offset..stamp_header_offset + not_sample.sample_size as usize));
642                    // let n_cnt = u16::from_ne_bytes(payload[..].try_into().expect("Failed to parse data")); // DEBUG
643
644                    Some(
645                            rt.spawn(async move  {
646                            callback(not_sample.not_hdl, stamp_header.timestamp, payload, user_data);
647                        })
648                    )
649                    
650                }); // Process join handles?
651
652                stamp_header_offset += not_sample.sample_size as usize;
653            } // for idx_notification_sample in 0..stamp_header.samples
654        } // for idx_stamp_header in 0..stamps
655    }
656}