thread_safe_cache/
network.rs1use std::hash::Hash;
2use tokio::net::TcpStream;
3use crate::{ThreadSafeCacheTrait};
4use serde_derive::{Serialize,Deserialize};
5use std::io;
6use tokio::io::AsyncWriteExt;
7
8pub struct NetworkCache<K: Eq + Hash + serde::de::DeserializeOwned, V: serde::de::DeserializeOwned> {
9 pub tcp_stream: TcpStream,
10 pub rt: tokio::runtime::Runtime,
11 pub phantom_data: std::marker::PhantomData<(K, V)>,
12
13}
14
15impl <K: std::marker::Send + 'static + Clone + Eq + Hash + serde::Serialize + serde::de::DeserializeOwned,
16 V: std::marker::Send + Clone + serde::Serialize + serde::de::DeserializeOwned +'static> ThreadSafeCacheTrait<K, V> for NetworkCache<K, V> {
17 fn put(&mut self, key: K, val: V)
18 where K: Eq + Hash,
19 {
20 self.rt.block_on(async {
22 self.tcp_stream.writable().await.unwrap();
23 let params = PutOpParams {
24 key: key,
25 val: val,
26 };
27 let mut encoded: Vec<u8> = bincode::serialize(¶ms).unwrap();
28 let mut op_code:Vec<u8> = vec![CacheOp::Put as u8];
29 op_code.append(&mut encoded);
30 let ret = self.tcp_stream.write_all(op_code.as_slice()).await;
31 self.tcp_stream.readable().await.unwrap();
32 let mut buf = Vec::with_capacity(4096);
33 loop {
34 match self.tcp_stream.try_read_buf(&mut buf) {
35 Ok(0) => {
36 panic!("closed");
37 },
38 Ok(n) => {
39 break;
41 }
42 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
43 continue;
44 }
45 Err(e) => {
46 panic!("{}",e);
47 }
48 }
49 }
50
51 });
52 }
53 fn put_exp(&mut self, key: K, val: V, expiration: i32)
54 where K: Eq + Hash + Clone,
55 {
56 }
57 fn get(&mut self, key: K) -> Option<V>
58 where K: Eq + Hash, V: Clone
59 {
60
61 let ret = self.rt.block_on(async {
62 self.tcp_stream.writable().await.unwrap();
63 let params = GetOpParams {
64 key: key,
65 };
66 let mut encoded: Vec<u8> = bincode::serialize(¶ms).unwrap();
67 let mut op_code:Vec<u8> = vec![CacheOp::Get as u8];
68 op_code.append(&mut encoded);
69
70 self.tcp_stream.write_all(op_code.as_slice()).await;
71
72 self.tcp_stream.readable().await.unwrap();
73 let mut buf = Vec::with_capacity(4096);
74 let mut ret:Option<V>;
75 loop {
76 match self.tcp_stream.try_read_buf(&mut buf) {
77 Ok(0) => {
78 panic!("closed");
79 },
80 Ok(n) => {
81 let ret_get: GetRet<V> = bincode::deserialize(&buf[0..n]).unwrap();
82 ret = ret_get.val;
83 break;
84 }
85 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
86 continue;
87 }
88 Err(e) => {
89 panic!("{}",e);
90 }
91 }
92 }
93 ret
94 });
95 ret
96 }
97 fn exists(&mut self, key: K) -> bool
98 where K: Eq + Hash, V: Clone
99 {
100 false
101 }
102 fn rm(&mut self, key: K)
103 where K: Eq + Hash,
104 {
105 }
106
107}
108
109pub enum CacheOp {
110 Put = 1,
111 PutExp = 2,
112 Get = 3,
113 Exists = 4,
114 Rm = 5,
115}
116
117#[derive(Serialize, Deserialize)]
118pub struct PutOpParams<K,V> {
119 pub key: K,
120 pub val: V
121}
122#[derive(Serialize, Deserialize)]
123pub struct GetOpParams<K> {
124 pub key: K
125}
126#[derive(Serialize, Deserialize)]
127pub struct GetRet<V> {
128 pub val: Option<V>,
129}