Skip to main content

ice_rs/
adapter.rs

1use crate::errors::*;
2use crate::proxy_parser::*;
3use crate::iceobject::*;
4use crate::protocol::*;
5use crate::encoding::*;
6use std::collections::BTreeMap;
7use tokio::net::TcpListener;
8use tokio::net::TcpStream;
9use tokio::io::{AsyncReadExt, AsyncWriteExt};
10
11
12pub struct Adapter {
13    endpoint: DirectProxyData,
14    objects: BTreeMap<String, Box<dyn IceObjectServer + Send + Sync>>
15}
16
17impl Adapter {
18    pub fn with_endpoint(name: &str, endpoint: &str) -> Result<Adapter, Box<dyn std::error::Error + Sync + Send>> {
19        let endpoint = parse_proxy_string(&format!("{}:{}", name, endpoint))?;
20        let endpoint = match endpoint {
21            ProxyStringType::DirectProxy(endpoint) => {
22                endpoint
23            }
24            _ => {
25                return Err(Box::new(ProtocolError::new("Direct proxy required for endpoint")))
26            }
27        };
28
29        Ok(Adapter{
30            endpoint,
31            objects: BTreeMap::new()
32        })
33    }
34
35    pub fn add(&mut self, ident: &str, object: Box<dyn IceObjectServer + Send + Sync>) {
36        self.objects.insert(String::from(ident), object);
37    }
38
39    pub async fn activate(&mut self) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
40        let listener = match &self.endpoint.endpoint {
41            EndPointType::TCP(data) => {
42                TcpListener::bind(format!("{}:{}", data.host, data.port)).await?
43            },
44            _ => {
45                return Err(Box::new(ProtocolError::new("Direct proxy required for endpoint")))
46            }
47        };
48
49        loop {
50            let (mut socket, _) = listener.accept().await?;
51            self.handle_socket(&mut socket).await?;
52        }
53    }
54
55    pub async fn handle_socket(&mut self, stream: &mut TcpStream) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
56        let mut buffer = [0u8; 4096];
57
58        let header = Header::new(3, 14);
59        let mut bytes = header.to_bytes()?;
60        stream.write(&mut bytes).await?;
61        loop {
62            let bytes = stream.read(&mut buffer).await?;
63            let mut read = 0;
64            let header = Header::from_bytes(&buffer[0..bytes], &mut read)?;
65            match header.message_type {
66                0 => {
67                    let req = RequestData::from_bytes(&buffer[read as usize..bytes], &mut read)?;
68                    let reply = if let Some(object) = self.objects.get_mut(&req.id.name) {
69                        match object.handle_request(&req).await {
70                            Ok(reply) => reply,
71                            Err(e) => {
72                                ReplyData {
73                                    request_id: req.request_id,
74                                    status: 1,
75                                    body: Encapsulation::from(e.to_string().as_bytes().to_vec())
76                                }
77                            }
78                        }
79                    } else {
80                        ReplyData {
81                            request_id: req.request_id,
82                            status: 1,
83                            body: Encapsulation::from(String::from("Object not found").as_bytes().to_vec())
84                        }
85                    };
86        
87                    let header = Header::new(2, reply.body.size + 19);
88                    let mut return_buffer = header.to_bytes()?;
89                    return_buffer.extend(reply.to_bytes()?);
90                    stream.write(&mut return_buffer).await?;                
91                }
92                4 => {
93                    return Ok(())
94                }
95                _ => {
96                    return Err(Box::new(ProtocolError::new("Unsupported message type")))
97                }
98            }
99            
100        }
101    }
102}