1#![allow(unused)]
27
28#[macro_use]
29mod misc;
30mod command_manager;
31mod command_cleaner;
32mod ads_read;
33mod ads_write;
34mod ads_read_state;
35mod ads_read_write;
36mod ads_add_device_notification;
37mod ads_delete_device_notification;
38mod ads_write_control;
39mod ads_read_device_info;
40
41use std::time::{Instant, Duration};
42use std::io;
43use std::net::SocketAddr;
44use std::mem::size_of_val;
45use std::sync::{Arc, Mutex, atomic::{AtomicU16, Ordering}};
46use tokio::net::TcpStream;
47use tokio::{runtime, stream};
48use tokio::io::{ReadHalf, WriteHalf};
49use tokio::io::{AsyncWriteExt, AsyncReadExt};
50use tokio::time::sleep;
51use log::{trace, debug, info, warn, error};
52use bytes::{Bytes, BytesMut};
53
54use command_cleaner::CommandCleaner;
55use command_manager::CommandManager;
56
57use misc::{AdsCommand, Handle, HandleData, NotHandle, AmsNetId, AdsStampHeader, AdsNotificationSample};
58pub use misc::{AdsTimeout, AdsNotificationAttrib, AdsTransMode, StateInfo, DeviceStateInfo, AdsState, Notification, Result, AdsError, AdsErrorCode}; const HEADER_SIZE : usize = 38;
64const AMS_HEADER_SIZE : usize = HEADER_SIZE - 6; const LEN_READ_REQ : usize = 12;
66const LEN_RW_REQ_MIN : usize = 16;
67const LEN_W_REQ_MIN : usize = 12;
68const LEN_ADD_DEV_NOT : usize = 38;
69const LEN_STAMP_HEADER_MIN : usize = 12; const LEN_NOT_SAMPLE_MIN : usize = 8; const LEN_DEL_DEV_NOT : usize = 4;
72const LEN_WR_CTRL_MIN : usize = 8;
73
74enum ProcessStateMachine{
75 ReadHeader,
76 ReadPayload { len_payload: usize, err_code: u32, invoke_id: u32, cmd: AdsCommand}
77}
78
79#[derive(Debug)]
80pub struct ClientBuilder<'a> {
81 addr: &'a str,
82 port: u16,
83 timeout: AdsTimeout,
84 retry_delay: Option<Duration>,
85}
86
87impl<'a> ClientBuilder<'a> {
88 pub fn new(addr: &'a str, port: u16) -> Self {
89 Self { addr, port, timeout: AdsTimeout::DefaultTimeout, retry_delay: None }
90 }
91
92 pub fn set_timeout(mut self, timeout: AdsTimeout) -> Self {
93 self.timeout = timeout;
94 self
95 }
96
97 pub fn set_retry_delay(mut self, retry_delay: Option<Duration>) -> Self {
98 self.retry_delay = retry_delay;
99 self
100 }
101
102 pub async fn build(self) -> Result<Client> {
103 Client::new(self.addr, self.port, self.timeout, self.retry_delay).await
104 }
105}
106
107#[derive(Debug)]
112pub struct Client {
113 _dst_addr : AmsNetId,
114 _dst_port : u16,
115 _src_addr : AmsNetId,
116 _src_port : u16,
117 timeout : u64, socket_wrt : Arc<Mutex<WriteHalf<TcpStream>>>,
119 handles : Arc<Mutex<Vec<Handle>>>, not_handles : Arc<Mutex<Vec<NotHandle>>>,
121 ams_header : [u8; HEADER_SIZE],
122 hdl_cnt : Arc<AtomicU16>
123}
124
125impl Client {
130
131 async fn connect(answer: &mut [u8]) -> Result<TcpStream> {
132 let stream = TcpStream::connect(&SocketAddr::from(([127, 0, 0, 1], 48898))).await.map_err::<AdsError, _>(|err| err.into() )?;
133 let handshake : [u8; 8] = [0x00, 0x10, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00 ];
134
135 loop {
137 stream.writable().await.map_err::<AdsError, _>(|err| err.into() )?;
139
140 match stream.try_write(&handshake) {
143 Ok(_) => {
144 break;
145 }
146 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
147 warn!("TcpStream: false positive reaction / stream was not yet ready for reading {:?}", e);
148 continue;
149 }
150 Err(e) => {
151 error!("Failed to write to socket");
152 return Err(e.into());
153 }
154 }
155 }
156
157 loop {
159 stream.readable().await?;
161
162 match stream.try_read(answer) {
165 Ok(0) => break,
166 Ok(n) => {
167 if n == 14 {
168 info!("Connection to AMS router established");
169 break;
170 } else {
171 error!("Router port disabled – TwinCAT system service not started.");
172 return Err(AdsError{n_error : 18, s_msg : String::from("Port disabled – TwinCAT system service not started.")});
173 }
174 }
175 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
176 warn!("TcpStream: false positive reaction / stream was not yet ready for writing {:?}", e);
177 continue;
178 }
179 Err(_) => {
180 error!("Router port disabled – TwinCAT system service not started.");
181 return Err(AdsError{n_error : 18, s_msg : String::from("Port disabled – TwinCAT system service not started.")});
182 }
183 }
184 }
185
186 Ok(stream)
187 }
188
189 async fn process_response(handles: Arc<Mutex<Vec<Handle>>>, not_handles: Arc<Mutex<Vec<NotHandle>>>, mut rd_stream : ReadHalf<TcpStream>, retry_delay: Option<Duration>) {
190
191 let mut state = ProcessStateMachine::ReadHeader;
192 let rt = runtime::Handle::current();
193
194 loop {
195 match &mut state {
196
197 ProcessStateMachine::ReadHeader => {
198
199 let mut header_buf : [u8; HEADER_SIZE] = [0; HEADER_SIZE];
200
201 match rd_stream.read(&mut header_buf).await {
202 Ok(0) => {
203 warn!("[0] Incoming ADS response - no bytes to read");
204 }
205 Ok(_) => {
206 let len_payload = Client::extract_length(&header_buf).unwrap_or_default();
207 let err_code = Client::extract_error_code(&header_buf).unwrap_or_default();
208 let invoke_id = Client::extract_invoke_id(&header_buf).unwrap_or_default();
209 let ads_cmd = Client::extract_cmd_tyte(&header_buf).unwrap_or_default();
210
211 if(len_payload == 0){
212 warn!("Invoke id {}: No ADS payload available - skip", invoke_id);
213 continue;
214 }
215
216 trace!("[0] Incoming ADS response with {:?} byte payload", len_payload);
217
218 state = ProcessStateMachine::ReadPayload{
219 len_payload : len_payload,
220 err_code : err_code,
221 invoke_id : invoke_id,
222 cmd : ads_cmd
223 };
224
225 }
226 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
227 warn!("TcpStream: false positive reaction / stream was not yet ready for reading{:?}", e);
228 continue;
229 }
230 Err(e) => {
231 error!("Socket Error (0x1): {:?}", e);
232 if let Some(ref delay) = retry_delay {
234 sleep(*delay).await;
235 }
236 }
237 }
238 }
239
240 ProcessStateMachine::ReadPayload {len_payload, err_code, invoke_id, cmd} => {
241
242 let mut payload = BytesMut::with_capacity(*len_payload);
243
244 match rd_stream.read_buf(&mut payload).await {
245 Ok(0) => {
246 info!("[1] ADS response {:?}, Invoke ID: {:?}: - zero payload", cmd, invoke_id);
247 state = ProcessStateMachine::ReadHeader;
248 }
249 Ok(_) => {
250
251 let buf = payload.freeze(); match cmd {
253 AdsCommand::DeviceNotification => {
254 trace!("[1] Processing device notification");
255 let _not_handles = Arc::clone(¬_handles);
256 rt.spawn(Client::process_device_notification(_not_handles, buf));
257
258 },
259 _ => {
260 trace!("[1] Processing ADS response");
261 let _handles = Arc::clone(&handles);
262 rt.spawn(Client::process_command(*err_code, *invoke_id, _handles, buf));
263 }
264
265 };
266
267 state = ProcessStateMachine::ReadHeader;
268 }
269 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
270 warn!("ADS command {:?}, Invoke ID: {:?}: - WouldBlock error during reading occured", cmd, invoke_id);
271 continue;
272 }
273 Err(e) => {
274 error!("ADS command {:?}, Invoke ID: {:?}: - Error occurred: {:?}", cmd, invoke_id, e);
275 if let Some(ref delay) = retry_delay {
277 sleep(*delay).await;
278 }
279 }
280 } }
282 } } } async fn socket_write(&self, data: &[u8] ) -> Result<()> {
287
288 let a_wrt_stream = Arc::clone(&self.socket_wrt);
289 {
290 let mut wrt_stream = a_wrt_stream.lock();
291
292 match wrt_stream {
293 Ok(ref mut stream) => {
294 stream.write(data).await?;
295 },
296 Err(_) => {
297 return Err( AdsError { n_error : 10, s_msg : String::from("Writing to Tcp Stream socket failed") } );
298 }
299 }
300 }
301 Ok(())
303 }
304
305 async fn new(addr : &str, port : u16, timeout : AdsTimeout, retry_delay: Option<Duration>) -> Result<Self> {
321 let state_flag : u16 = 4;
322 let error_code : u32 = 0;
323 let mut b_vec = Vec::<u8>::new();
324
325 for s_byte in addr.split('.') {
327 let n_byte = s_byte.parse::<u8>()?;
329 b_vec.push(n_byte);
330 }
331
332 let timeout = match timeout {
333 AdsTimeout::DefaultTimeout => 5,
334 AdsTimeout::CustomTimeout(time) => time
335 };
336
337 let hdl_rt = runtime::Handle::current();
338
339 let mut answer : [u8; 14] = [0; 14];
340
341 let _stream = Client::connect(&mut answer).await?;
342 info!("ADS client port opened: {}", u16::from_ne_bytes(answer[12..14].try_into().unwrap_or_default()));
343
344 let (read, write) = tokio::io::split(_stream);
350
351 let a_socket_wrt = Arc::new(Mutex::new(write));
352
353 let a_handles = Arc::new(Mutex::new( Vec::<Handle>::new() ));
355 let a_not_handles = Arc::new(Mutex::new( Vec::<NotHandle>::new() ));
356
357 let response_vector_a = Arc::clone(&a_handles);
359 let not_response_vector_a = Arc::clone(&a_not_handles);
360 hdl_rt.spawn(Client::process_response(response_vector_a, not_response_vector_a, read, retry_delay));
361
362 let response_vector_b = Arc::clone(&a_handles);
364 hdl_rt.spawn(CommandCleaner::new(1, timeout, response_vector_b));
365
366 Ok(Self {
367 _dst_addr : b_vec.clone().try_into().expect("AmsNetId consist of exact 6 bytes"), _dst_port : port,
369 _src_addr : [answer[6], answer[7], answer[8], answer[9], answer[10], answer[11]],
370 _src_port : u16::from_ne_bytes(answer[12..14].try_into().expect("Parsing source port failed")),
371 timeout : timeout,
372 socket_wrt : a_socket_wrt,
373 handles : a_handles,
374 not_handles : a_not_handles,
375 ams_header : [
376 0, 0,
378 0, 0,
380 0,
381 0,
382 b_vec[0], b_vec[1],
384 b_vec[2],
385 b_vec[3],
386 b_vec[4],
387 b_vec[5],
388 u16_low_byte!(port), u16_high_byte!(port),
390 answer[6], answer[7],
392 answer[8],
393 answer[9],
394 answer[10],
395 answer[11],
396 answer[12], answer[13],
398 0, 0,
400 u16_low_byte!(state_flag), u16_high_byte!(state_flag),
402 0, 0,
404 0,
405 0,
406 u32_lw_lb!(error_code), u32_lw_hb!(error_code),
408 u32_hw_lb!(error_code),
409 u32_hw_hb!(error_code),
410 0, 0,
412 0,
413 0
414 ],
415 hdl_cnt : Arc::new(AtomicU16::new(1))
416 })
417 }
418
419 fn register_command_handle(&self, invoke_id : u32, cmd : AdsCommand){
420 let a_handles = Arc::clone(&self.handles);
421
422 let rs_req_hdl = Handle {
423 cmd_type : cmd,
424 invoke_id : invoke_id,
425 data : HandleData::default(),
426 timestamp : Instant::now(),
427 };
428
429 {
430 let mut handles = a_handles.lock().expect("Threading Error");
431 handles.push(rs_req_hdl);
432 }
433 }
434
435 fn register_not_handle(&self, not_hdl: u32, callback: Notification, user_data: Option<&Arc<Mutex<BytesMut>>>) {
436 let a_not_handles = Arc::clone(&self.not_handles);
437
438 let not_hdl = NotHandle {
439 callback : callback,
440 not_hdl : not_hdl,
441 user_data : user_data.and_then(|arc_bytes| Some(Arc::clone(arc_bytes)) )
442 };
443
444 {
445 let mut not_handles = a_not_handles.lock().expect("Threading Error");
446 not_handles.push(not_hdl);
447 }
448 }
449
450 fn create_cmd_man_future(&self, invoke_id: u32) -> CommandManager {
451 let a_handles = Arc::clone(&self.handles);
452 CommandManager::new(self.timeout, invoke_id, a_handles)
453 }
454
455 fn create_invoke_id(&self) -> u32 {
456 u32::from(self.hdl_cnt.fetch_add(1, Ordering::SeqCst))
457 }
458
459 fn c_init_ams_header(&self, invoke_id : u32, length_payload : Option<u32>, cmd : AdsCommand) -> [u8; HEADER_SIZE] {
460 let length_payload = length_payload.unwrap_or(0);
461 let length_header : u32 = AMS_HEADER_SIZE as u32 + length_payload;
462
463 let mut ams_header : [u8; HEADER_SIZE] = self.ams_header;
464 ams_header[2..6].copy_from_slice(&length_header.to_ne_bytes());
466 ams_header[22..24].copy_from_slice(&(cmd as u16).to_ne_bytes());
468 ams_header[26..30].copy_from_slice(&length_payload.to_ne_bytes());
470 ams_header[34..38].copy_from_slice(&invoke_id.to_ne_bytes());
472
473 ams_header
474 }
475
476 fn eval_return_code(answer: &[u8]) -> Result<u32> {
477 let ret_code = u32::from_ne_bytes(answer[0..4].try_into()?);
478
479 if ret_code != 0 {
480 Err(AdsError{ n_error : ret_code, s_msg : String::from("Errorcode of ADS response") }) } else {
482 Ok(ret_code)
483 }
484 }
485
486 fn eval_ams_error(ams_err : u32) -> Result<()> {
487 if ams_err != 0 {
488 return Err(AdsError{n_error : ams_err, s_msg : String::from("Errorcode of ADS response") });
489 }
490 Ok(())
491 }
492
493 fn extract_error_code(answer: &[u8]) -> Result<u32> {
494 Ok(u32::from_ne_bytes(answer[HEADER_SIZE-8..HEADER_SIZE-4].try_into()?))
495 }
496
497 fn extract_invoke_id(answer: &[u8]) -> Result<u32> {
498 Ok(u32::from_ne_bytes(answer[HEADER_SIZE-4..HEADER_SIZE].try_into()?))
499 }
500
501 fn extract_cmd_tyte(answer: &[u8]) -> Result<AdsCommand>{
502 u16::from_ne_bytes(answer[HEADER_SIZE-16..HEADER_SIZE-14].try_into()?).try_into()
503 }
504
505 fn extract_length(answer: &[u8]) -> Result<usize>{
506 let tmp = u32::from_ne_bytes(answer[HEADER_SIZE-12..HEADER_SIZE-8].try_into()?);
508 Ok(usize::try_from(tmp)?)
510 }
511
512 fn not_extract_length(answer: &[u8]) -> Result<usize>{
513 let tmp = u32::from_ne_bytes(answer[0..4].try_into()?);
514 Ok(usize::try_from(tmp)?)
515 }
516
517 fn not_extract_stamps(answer: &[u8]) -> Result<u32>{
519 Ok(u32::from_ne_bytes(answer[4..8].try_into()?))
520 }
521
522 async fn process_command(err_code: u32, invoke_id: u32, cmd_register: Arc<Mutex<Vec<Handle>>>, data: Bytes){
523 trace!("[2] AdsCmd: Invoke ID: {}", invoke_id);
524
525 match cmd_register.lock() {
526 Ok(mut h) => {
527
528 if let Some(hdl) = h.iter_mut().find( | hdl | hdl.invoke_id == invoke_id) {
529 hdl.data.payload = Some(data);
530 hdl.data.ams_err = err_code;
531 } else {
532 warn!("No corresponding invoke ID found in CMD register - response will expire");
533 }
534
535 },
536 Err(e) => {
537 error!("Failed to lock command register - response dropped");
538 return;
539 }
540 };
541 }
542
543 async fn process_device_notification(not_register: Arc<Mutex<Vec<NotHandle>>>, data: Bytes){
544 trace!("[2] Start processing AdsDeviceNotification");
545 let stream_length = match Client::not_extract_length(&data){
546 Ok(size) => size,
547 Err(e) => {
548 error!("Failed to extract notification length - Notification dropped - {:?}", e);
549 return;
550 }
551 };
552
553 let no_stamps = match Client::not_extract_stamps(&data){
554 Ok(stamps) => stamps,
555 Err(e) => {
556 error!("Failed to extract number of stamps- Notification dropped - {:?}", e);
557 return;
558 }
559 };
560
561 let rt = runtime::Handle::current();
562 let max_stamp_header_offset = stream_length + size_of_val(&no_stamps);
567 let mut stamp_header_offset : usize = 8; for _ in 0..no_stamps { if (stamp_header_offset + LEN_STAMP_HEADER_MIN) > max_stamp_header_offset {
573 info!("Received Device Notification without sample data");
574 continue;
575 }
576
577
578 let stamp_header = AdsStampHeader {
579 timestamp : u64::from_ne_bytes(data[stamp_header_offset.. stamp_header_offset + 8]
580 .try_into()
581 .unwrap_or_default()),
582
583 samples : u32::from_ne_bytes(data[stamp_header_offset + 8..stamp_header_offset + 12]
584 .try_into()
585 .unwrap_or_default())
586 };
587
588 if (stamp_header == AdsStampHeader::default()){
589 info!("Empty AdsStampHeader - Continue with next stamp");
590 continue;
591 }
592
593 stamp_header_offset += LEN_STAMP_HEADER_MIN;
595 for _ in 0..stamp_header.samples {
598 if (stamp_header_offset + LEN_NOT_SAMPLE_MIN) > max_stamp_header_offset {
600 info!("[A] Not enough data in available in stream");
601 return;
602 }
603
604 let not_sample = AdsNotificationSample {
605 not_hdl : u32::from_ne_bytes(data[stamp_header_offset..stamp_header_offset + 4]
606 .try_into()
607 .unwrap_or_default()),
608
609 sample_size : u32::from_ne_bytes(data[stamp_header_offset + 4 ..stamp_header_offset + 8]
610 .try_into()
611 .unwrap_or_default())
612 };
613
614 if (not_sample == AdsNotificationSample::default()){
615 info!("No data in AdsNotificationSample - skip");
616 continue;
617 }
618
619 stamp_header_offset += LEN_NOT_SAMPLE_MIN;
620
621 if (stamp_header_offset + not_sample.sample_size as usize) > max_stamp_header_offset {
622 info!("[B] Not enough data in available in stream");
623 return;
624 }
625
626 let mut _cb_and_data : Option<(Notification, Option<Arc<Mutex<BytesMut>>>)> = None;
627
628 { let mut _not_handles = not_register.lock().expect("Threading Error");
633 let mut _iter = _not_handles.iter_mut();
634
635 _cb_and_data = _iter.find( | hdl | hdl.not_hdl == not_sample.not_hdl)
636 .and_then(| hdl : &mut NotHandle | Some( (hdl.callback, hdl.user_data.clone()) ) ); } _cb_and_data.and_then(|(callback, user_data)| {
641 let payload = Bytes::from(data.slice(stamp_header_offset..stamp_header_offset + not_sample.sample_size as usize));
642 Some(
645 rt.spawn(async move {
646 callback(not_sample.not_hdl, stamp_header.timestamp, payload, user_data);
647 })
648 )
649
650 }); stamp_header_offset += not_sample.sample_size as usize;
653 } } }
656}