thread_safe_cache/
network.rs

1use std::hash::Hash;
2use tokio::net::TcpStream;
3use crate::{ThreadSafeCacheTrait};
4use serde_derive::{Serialize,Deserialize};
5use std::io;
6use tokio::io::AsyncWriteExt;
7
8pub struct NetworkCache<K: Eq + Hash + serde::de::DeserializeOwned, V: serde::de::DeserializeOwned> {
9    pub tcp_stream: TcpStream,
10    pub rt: tokio::runtime::Runtime,
11    pub phantom_data: std::marker::PhantomData<(K, V)>,
12
13}
14
15impl <K: std::marker::Send  + 'static + Clone +  Eq + Hash + serde::Serialize + serde::de::DeserializeOwned,
16    V: std::marker::Send  + Clone + serde::Serialize + serde::de::DeserializeOwned +'static> ThreadSafeCacheTrait<K, V> for NetworkCache<K, V> {
17    fn put(&mut self, key: K, val: V)
18        where K: Eq + Hash,
19    {
20        // println!("put");
21       self.rt.block_on(async {
22            self.tcp_stream.writable().await.unwrap();
23            let params = PutOpParams {
24                key: key,
25                val: val,
26            };
27           let mut encoded: Vec<u8> = bincode::serialize(&params).unwrap();
28           let mut op_code:Vec<u8> = vec![CacheOp::Put as u8];
29           op_code.append(&mut encoded);
30           let ret = self.tcp_stream.write_all(op_code.as_slice()).await;
31           self.tcp_stream.readable().await.unwrap();
32           let mut buf = Vec::with_capacity(4096);
33           loop {
34               match self.tcp_stream.try_read_buf(&mut buf) {
35                   Ok(0) => {
36                       panic!("closed");
37                   },
38                   Ok(n) => {
39                       // println!(".{}",n);
40                       break;
41                   }
42                   Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
43                       continue;
44                   }
45                   Err(e) => {
46                       panic!("{}",e);
47                   }
48               }
49           }
50
51        });
52    }
53    fn put_exp(&mut self, key: K, val: V, expiration: i32)
54        where K: Eq + Hash + Clone,
55    {
56    }
57    fn get(&mut self, key: K) -> Option<V>
58        where K: Eq + Hash, V: Clone
59    {
60
61        let ret = self.rt.block_on(async {
62            self.tcp_stream.writable().await.unwrap();
63            let params = GetOpParams {
64                key: key,
65            };
66            let mut encoded: Vec<u8> = bincode::serialize(&params).unwrap();
67            let mut op_code:Vec<u8> = vec![CacheOp::Get as u8];
68            op_code.append(&mut encoded);
69
70            self.tcp_stream.write_all(op_code.as_slice()).await;
71
72            self.tcp_stream.readable().await.unwrap();
73            let mut buf = Vec::with_capacity(4096);
74            let mut ret:Option<V>;
75            loop {
76                match self.tcp_stream.try_read_buf(&mut buf) {
77                    Ok(0) => {
78                        panic!("closed");
79                    },
80                    Ok(n) => {
81                        let ret_get: GetRet<V> = bincode::deserialize(&buf[0..n]).unwrap();
82                        ret = ret_get.val;
83                        break;
84                    }
85                    Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
86                        continue;
87                    }
88                    Err(e) => {
89                        panic!("{}",e);
90                    }
91                }
92            }
93            ret
94        });
95        ret
96    }
97    fn exists(&mut self, key: K) -> bool
98        where K: Eq + Hash, V: Clone
99    {
100        false
101    }
102    fn rm(&mut self, key: K)
103        where K: Eq + Hash,
104    {
105    }
106
107}
108
109pub enum CacheOp {
110    Put = 1,
111    PutExp = 2,
112    Get = 3,
113    Exists = 4,
114    Rm = 5,
115}
116
117#[derive(Serialize, Deserialize)]
118pub struct PutOpParams<K,V> {
119    pub key: K,
120    pub val: V
121}
122#[derive(Serialize, Deserialize)]
123pub struct GetOpParams<K> {
124    pub key: K
125}
126#[derive(Serialize, Deserialize)]
127pub struct GetRet<V> {
128    pub val: Option<V>,
129}