openstack/object_storage/
objects.rs1use std::collections::HashMap;
18
19use async_trait::async_trait;
20use chrono::{DateTime, TimeZone};
21use futures::io::AsyncRead;
22use futures::{Stream, TryStreamExt};
23use osauth::services::OBJECT_STORAGE;
24use reqwest::Url;
25
26use super::super::common::{ContainerRef, ObjectRef, Refresh};
27use super::super::session::Session;
28use super::super::utils::{try_one, Query};
29use super::super::Result;
30use super::{api, protocol};
31
32#[derive(Clone, Debug)]
34pub struct ObjectQuery {
35 session: Session,
36 c_name: String,
37 query: Query,
38 limit: Option<usize>,
39 marker: Option<String>,
40}
41
42#[derive(Debug)]
44pub struct NewObject<R> {
45 session: Session,
46 c_name: ContainerRef,
47 name: String,
48 body: R,
49 headers: ObjectHeaders,
50}
51
52#[derive(Debug, Default)]
54pub struct ObjectHeaders {
55 pub delete_after: Option<u32>,
56 pub delete_at: Option<i64>,
57 pub metadata: HashMap<String, String>,
58}
59
60#[derive(Clone, Debug)]
62pub struct Object {
63 session: Session,
64 inner: protocol::Object,
65 c_name: String,
66}
67
68impl Object {
69 pub(crate) fn new(session: Session, inner: protocol::Object, c_name: String) -> Object {
71 Object {
72 session,
73 inner,
74 c_name,
75 }
76 }
77
78 pub(crate) async fn create<C, Id, R>(
80 session: Session,
81 container: C,
82 name: Id,
83 body: R,
84 ) -> Result<Object>
85 where
86 C: Into<ContainerRef>,
87 Id: AsRef<str>,
88 R: AsyncRead + Sync + Send + 'static,
89 {
90 let new_object = NewObject::new(
91 session,
92 container.into(),
93 name.as_ref().to_string(),
95 body,
96 );
97 new_object.create().await
98 }
99
100 pub(crate) async fn load<C, Id>(session: Session, container: C, name: Id) -> Result<Object>
102 where
103 C: Into<ContainerRef>,
104 Id: AsRef<str>,
105 {
106 let c_ref = container.into();
107 let c_name = c_ref.to_string();
108 let inner = api::get_object(&session, c_ref, name).await?;
109 Ok(Object::new(session, inner, c_name))
110 }
111
112 #[inline]
114 pub async fn delete(self) -> Result<()> {
115 api::delete_object(&self.session, &self.c_name, self.inner.name).await
116 }
117
118 #[inline]
122 pub async fn download(&self) -> Result<impl AsyncRead + Send + '_> {
123 api::download_object(&self.session, &self.c_name, &self.inner.name).await
124 }
125
126 transparent_property! {
127 #[doc = "Total size of the object."]
128 bytes: u64
129 }
130
131 #[inline]
133 pub fn container_name(&self) -> &String {
134 &self.c_name
135 }
136
137 transparent_property! {
138 #[doc = "Object content type (if set)."]
139 content_type: ref Option<String>
140 }
141
142 transparent_property! {
143 #[doc = "Object hash or ETag, which is a content's md5 hash"]
144 hash: ref Option<String>
145 }
146
147 transparent_property! {
148 #[doc = "Object name."]
149 name: ref String
150 }
151
152 #[inline]
154 pub async fn url(&self) -> Result<Url> {
155 self.session
156 .get_endpoint(OBJECT_STORAGE, &[self.container_name(), self.name()])
157 .await
158 }
159}
160
161#[async_trait]
162impl Refresh for Object {
163 async fn refresh(&mut self) -> Result<()> {
165 self.inner = api::get_object(&self.session, &self.c_name, &self.inner.name).await?;
166 Ok(())
167 }
168}
169
170impl ObjectQuery {
171 pub(crate) fn new<C: Into<ContainerRef>>(session: Session, container: C) -> ObjectQuery {
172 ObjectQuery {
173 session,
174 c_name: container.into().into(),
175 query: Query::new(),
176 limit: None,
177 marker: None,
178 }
179 }
180
181 pub fn with_marker<T: Into<String>>(mut self, marker: T) -> Self {
183 self.marker = Some(marker.into());
184 self
185 }
186
187 pub fn with_limit(mut self, limit: usize) -> Self {
189 self.limit = Some(limit);
190 self
191 }
192
193 pub async fn into_stream(self) -> Result<impl Stream<Item = Result<Object>>> {
195 debug!(
196 "Fetching objects in container {} with {:?}",
197 self.c_name, self.query
198 );
199 Ok(api::list_objects(
200 &self.session,
201 self.c_name.clone(),
202 self.query,
203 self.limit,
204 self.marker,
205 )
206 .await?
207 .map_ok({
208 let session = self.session;
209 let c_name = self.c_name;
210 move |obj| Object::new(session.clone(), obj, c_name.clone())
211 }))
212 }
213
214 pub async fn all(self) -> Result<Vec<Object>> {
218 self.into_stream().await?.try_collect().await
219 }
220
221 pub async fn one(mut self) -> Result<Object> {
226 debug!(
227 "Fetching one object in container {} with {:?}",
228 self.c_name, self.query
229 );
230 self.limit = Some(2);
233 try_one(self.into_stream().await?).await
234 }
235}
236
237impl<R: AsyncRead + Sync + Send + 'static> NewObject<R> {
238 pub(crate) fn new(
240 session: Session,
241 c_name: ContainerRef,
242 name: String,
243 body: R,
244 ) -> NewObject<R> {
245 NewObject {
246 session,
247 c_name,
248 name,
249 body,
250 headers: ObjectHeaders::default(),
251 }
252 }
253
254 pub async fn create(self) -> Result<Object> {
256 let c_name = self.c_name.clone();
257
258 let inner = api::create_object(
259 &self.session,
260 self.c_name,
261 self.name,
262 self.body,
263 self.headers,
264 )
265 .await?;
266
267 Ok(Object::new(self.session, inner, c_name.into()))
268 }
269
270 #[inline]
272 pub fn metadata(&mut self) -> &mut HashMap<String, String> {
273 &mut self.headers.metadata
274 }
275
276 #[inline]
278 pub fn with_delete_after(mut self, ttl: u32) -> NewObject<R> {
279 self.headers.delete_after = Some(ttl);
280 self
281 }
282
283 #[inline]
285 pub fn with_delete_at<T: TimeZone>(mut self, datetime: DateTime<T>) -> NewObject<R> {
286 self.headers.delete_at = Some(datetime.timestamp());
287 self
288 }
289
290 #[inline]
292 pub fn with_metadata<K, V>(mut self, key: K, item: V) -> NewObject<R>
293 where
294 K: Into<String>,
295 V: Into<String>,
296 {
297 let _ = self.headers.metadata.insert(key.into(), item.into());
298 self
299 }
300}
301
302impl From<Object> for ObjectRef {
303 fn from(value: Object) -> ObjectRef {
304 ObjectRef::new_verified(value.inner.name)
305 }
306}
307
308#[cfg(feature = "object-storage")]
309impl ObjectRef {
310 #[allow(unused)]
311 pub(crate) async fn into_verified(self, _session: &Session) -> Result<Self> {
312 Ok(self)
313 }
314}