pastry_dht/node.rs
1use std::net::SocketAddr;
2
3use tonic::Request;
4
5use crate::{
6 error::*,
7 internal::{
8 dht::{node::Node, service::grpc::*},
9 hring::hasher::Sha256Hasher,
10 pastry::shared::Config,
11 },
12};
13
14/// An instance of a Pastry node.
15///
16#[derive(Clone)]
17pub struct PastryNode {
18 node: Node,
19}
20
21// Basic Node methods
22impl PastryNode {
23 /// Registers a new Pastry node which will be available publicly on
24 /// http://hostname:port
25 ///
26 /// # Arguments
27 ///
28 /// * `config` - The Pastry network configuration.
29 /// * `addr` - The address of the socket to listen on.
30 /// * `pub_addr` - The address the node will be exposed on.
31 ///
32 /// # Returns
33 ///
34 /// A Result containing the newly registered node.
35 ///
36 pub fn new(config: Config, addr: SocketAddr, pub_addr: SocketAddr) -> Result<Self> {
37 Ok(PastryNode {
38 node: Node::new(config, addr, pub_addr)?,
39 })
40 }
41
42 /// Connects to Pastry network via bootstrap node and serves node server.
43 /// Consumes node.
44 ///
45 /// # Arguments
46 ///
47 /// * `bootstrap_addr` - A bootstrap node address.
48 ///
49 /// # Returns
50 ///
51 /// An empty Result
52 ///
53 pub async fn bootstrap_and_serve(self, bootstrap_addr: Option<&str>) -> Result<()> {
54 self.node.bootstrap_and_serve(bootstrap_addr).await?.await?
55 }
56
57 /// Gets the internal Pastry node ID.
58 ///
59 pub fn get_id(&self) -> u64 {
60 self.node.id
61 }
62
63 /// Gets the public Pastry node address.
64 ///
65 pub fn get_public_address(&self) -> String {
66 self.node.pub_addr.clone()
67 }
68}
69
70// gRPC methods
71impl PastryNode {
72 /// Retrieves a value associated with the given key stored in the Pastry
73 /// network.
74 ///
75 /// # Arguments
76 ///
77 /// * `key` - A slice of bytes representing the key for which the value is
78 /// requested.
79 ///
80 /// # Returns
81 ///
82 /// Returns a `Result` which is:
83 ///
84 /// - `Ok(Some(Vec<u8>))` if the key exists, containing the associated
85 /// value.
86 /// - `Ok(None)` if the key does not exist.
87 /// - `Err(e)` where `e` encapsulates any error encountered during the
88 /// operation.
89 ///
90 pub async fn get_kv(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
91 let response = self
92 .node
93 .query(Request::new(QueryRequest {
94 from_id: 0,
95 matched_digits: 0,
96 hops: 0,
97 query_type: QueryType::Get.into(),
98 key: Sha256Hasher::hash_once(key),
99 value: None,
100 }))
101 .await?
102 .into_inner();
103
104 Ok(response.value)
105 }
106
107 /// Sets a value for a given key in the Pastry network.
108 ///
109 /// # Arguments
110 ///
111 /// * `key` - A slice of bytes representing the key to which the value is
112 /// to be associated.
113 /// * `value` - A slice of bytes representing the value to be set.
114 ///
115 /// # Returns
116 ///
117 /// Returns a `Result` which is:
118 ///
119 /// - `Ok(Some(Vec<u8>))` if the key existed and the value was replaced,
120 /// containing the old value.
121 /// - `Ok(None)` if the key did not exist and a new entry was created.
122 /// - `Err(e)` where `e` encapsulates any error encountered during the
123 /// operation.
124 ///
125 pub async fn set_kv(&self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
126 let response = self
127 .node
128 .query(Request::new(QueryRequest {
129 from_id: 0,
130 matched_digits: 0,
131 hops: 0,
132 query_type: QueryType::Set.into(),
133 key: Sha256Hasher::hash_once(key),
134 value: Some(value.to_vec()),
135 }))
136 .await?
137 .into_inner();
138
139 Ok(response.value)
140 }
141
142 /// Deletes the value associated with the given key in the Pastry network.
143 ///
144 /// # Arguments
145 ///
146 /// * `key` - A slice of bytes representing the key whose associated value
147 /// is to be deleted.
148 ///
149 /// # Returns
150 ///
151 /// Returns a `Result` which is:
152 ///
153 /// - `Ok(Some(Vec<u8>))` if the key existed and the value was successfully deleted, containing the deleted value.
154 /// - `Ok(None)` if the key did not exist.
155 /// - `Err(e)` where `e` encapsulates any error encountered during the operation.
156 ///
157 pub async fn delete_kv(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
158 let response = self
159 .node
160 .query(Request::new(QueryRequest {
161 from_id: 0,
162 matched_digits: 0,
163 hops: 0,
164 query_type: QueryType::Delete.into(),
165 key: Sha256Hasher::hash_once(key),
166 value: None,
167 }))
168 .await?
169 .into_inner();
170
171 Ok(response.value)
172 }
173}