openstack/object_storage/
objects.rs

1// Copyright 2019 Dmitry Tantsur <divius.inside@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Stored objects.
16
17use std::collections::HashMap;
18
19use async_trait::async_trait;
20use chrono::{DateTime, TimeZone};
21use futures::io::AsyncRead;
22use futures::{Stream, TryStreamExt};
23use osauth::services::OBJECT_STORAGE;
24use reqwest::Url;
25
26use super::super::common::{ContainerRef, ObjectRef, Refresh};
27use super::super::session::Session;
28use super::super::utils::{try_one, Query};
29use super::super::Result;
30use super::{api, protocol};
31
32/// A query to objects.
33#[derive(Clone, Debug)]
34pub struct ObjectQuery {
35    session: Session,
36    c_name: String,
37    query: Query,
38    limit: Option<usize>,
39    marker: Option<String>,
40}
41
42/// A request to create an object.
43#[derive(Debug)]
44pub struct NewObject<R> {
45    session: Session,
46    c_name: ContainerRef,
47    name: String,
48    body: R,
49    headers: ObjectHeaders,
50}
51
52/// Optional headers for an object.
53#[derive(Debug, Default)]
54pub struct ObjectHeaders {
55    pub delete_after: Option<u32>,
56    pub delete_at: Option<i64>,
57    pub metadata: HashMap<String, String>,
58}
59
60/// Structure representing an object.
61#[derive(Clone, Debug)]
62pub struct Object {
63    session: Session,
64    inner: protocol::Object,
65    c_name: String,
66}
67
68impl Object {
69    /// Create a new Object object.
70    pub(crate) fn new(session: Session, inner: protocol::Object, c_name: String) -> Object {
71        Object {
72            session,
73            inner,
74            c_name,
75        }
76    }
77
78    /// Create an object.
79    pub(crate) async fn create<C, Id, R>(
80        session: Session,
81        container: C,
82        name: Id,
83        body: R,
84    ) -> Result<Object>
85    where
86        C: Into<ContainerRef>,
87        Id: AsRef<str>,
88        R: AsyncRead + Sync + Send + 'static,
89    {
90        let new_object = NewObject::new(
91            session,
92            container.into(),
93            // TODO(dtantsur): get rid of to_string here.
94            name.as_ref().to_string(),
95            body,
96        );
97        new_object.create().await
98    }
99
100    /// Load an Object.
101    pub(crate) async fn load<C, Id>(session: Session, container: C, name: Id) -> Result<Object>
102    where
103        C: Into<ContainerRef>,
104        Id: AsRef<str>,
105    {
106        let c_ref = container.into();
107        let c_name = c_ref.to_string();
108        let inner = api::get_object(&session, c_ref, name).await?;
109        Ok(Object::new(session, inner, c_name))
110    }
111
112    /// Delete the object.
113    #[inline]
114    pub async fn delete(self) -> Result<()> {
115        api::delete_object(&self.session, &self.c_name, self.inner.name).await
116    }
117
118    /// Download the object.
119    ///
120    /// The object can be read from the resulting reader.
121    #[inline]
122    pub async fn download(&self) -> Result<impl AsyncRead + Send + '_> {
123        api::download_object(&self.session, &self.c_name, &self.inner.name).await
124    }
125
126    transparent_property! {
127        #[doc = "Total size of the object."]
128        bytes: u64
129    }
130
131    /// Container name.
132    #[inline]
133    pub fn container_name(&self) -> &String {
134        &self.c_name
135    }
136
137    transparent_property! {
138        #[doc = "Object content type (if set)."]
139        content_type: ref Option<String>
140    }
141
142    transparent_property! {
143        #[doc = "Object hash or ETag, which is a content's md5 hash"]
144        hash: ref Option<String>
145    }
146
147    transparent_property! {
148        #[doc = "Object name."]
149        name: ref String
150    }
151
152    /// Object url.
153    #[inline]
154    pub async fn url(&self) -> Result<Url> {
155        self.session
156            .get_endpoint(OBJECT_STORAGE, &[self.container_name(), self.name()])
157            .await
158    }
159}
160
161#[async_trait]
162impl Refresh for Object {
163    /// Refresh the object.
164    async fn refresh(&mut self) -> Result<()> {
165        self.inner = api::get_object(&self.session, &self.c_name, &self.inner.name).await?;
166        Ok(())
167    }
168}
169
170impl ObjectQuery {
171    pub(crate) fn new<C: Into<ContainerRef>>(session: Session, container: C) -> ObjectQuery {
172        ObjectQuery {
173            session,
174            c_name: container.into().into(),
175            query: Query::new(),
176            limit: None,
177            marker: None,
178        }
179    }
180
181    /// Add marker to the request.
182    pub fn with_marker<T: Into<String>>(mut self, marker: T) -> Self {
183        self.marker = Some(marker.into());
184        self
185    }
186
187    /// Add limit to the request.
188    pub fn with_limit(mut self, limit: usize) -> Self {
189        self.limit = Some(limit);
190        self
191    }
192
193    /// Convert this query into a stream of objects.
194    pub async fn into_stream(self) -> Result<impl Stream<Item = Result<Object>>> {
195        debug!(
196            "Fetching objects in container {} with {:?}",
197            self.c_name, self.query
198        );
199        Ok(api::list_objects(
200            &self.session,
201            self.c_name.clone(),
202            self.query,
203            self.limit,
204            self.marker,
205        )
206        .await?
207        .map_ok({
208            let session = self.session;
209            let c_name = self.c_name;
210            move |obj| Object::new(session.clone(), obj, c_name.clone())
211        }))
212    }
213
214    /// Execute this request and return all results.
215    ///
216    /// A convenience shortcut for `self.into_iter().collect()`.
217    pub async fn all(self) -> Result<Vec<Object>> {
218        self.into_stream().await?.try_collect().await
219    }
220
221    /// Return one and exactly one result.
222    ///
223    /// Fails with `ResourceNotFound` if the query produces no results and
224    /// with `TooManyItems` if the query produces more than one result.
225    pub async fn one(mut self) -> Result<Object> {
226        debug!(
227            "Fetching one object in container {} with {:?}",
228            self.c_name, self.query
229        );
230        // We need only one result. We fetch maximum two to be able
231        // to check if the query yieled more than one result.
232        self.limit = Some(2);
233        try_one(self.into_stream().await?).await
234    }
235}
236
237impl<R: AsyncRead + Sync + Send + 'static> NewObject<R> {
238    /// Start creating an object.
239    pub(crate) fn new(
240        session: Session,
241        c_name: ContainerRef,
242        name: String,
243        body: R,
244    ) -> NewObject<R> {
245        NewObject {
246            session,
247            c_name,
248            name,
249            body,
250            headers: ObjectHeaders::default(),
251        }
252    }
253
254    /// Request creation of the object.
255    pub async fn create(self) -> Result<Object> {
256        let c_name = self.c_name.clone();
257
258        let inner = api::create_object(
259            &self.session,
260            self.c_name,
261            self.name,
262            self.body,
263            self.headers,
264        )
265        .await?;
266
267        Ok(Object::new(self.session, inner, c_name.into()))
268    }
269
270    /// Metadata to set on the object.
271    #[inline]
272    pub fn metadata(&mut self) -> &mut HashMap<String, String> {
273        &mut self.headers.metadata
274    }
275
276    /// Set TTL in seconds for the object.
277    #[inline]
278    pub fn with_delete_after(mut self, ttl: u32) -> NewObject<R> {
279        self.headers.delete_after = Some(ttl);
280        self
281    }
282
283    /// Set the date and time when the object must be deleted.
284    #[inline]
285    pub fn with_delete_at<T: TimeZone>(mut self, datetime: DateTime<T>) -> NewObject<R> {
286        self.headers.delete_at = Some(datetime.timestamp());
287        self
288    }
289
290    /// Insert a new metadata item.
291    #[inline]
292    pub fn with_metadata<K, V>(mut self, key: K, item: V) -> NewObject<R>
293    where
294        K: Into<String>,
295        V: Into<String>,
296    {
297        let _ = self.headers.metadata.insert(key.into(), item.into());
298        self
299    }
300}
301
302impl From<Object> for ObjectRef {
303    fn from(value: Object) -> ObjectRef {
304        ObjectRef::new_verified(value.inner.name)
305    }
306}
307
308#[cfg(feature = "object-storage")]
309impl ObjectRef {
310    #[allow(unused)]
311    pub(crate) async fn into_verified(self, _session: &Session) -> Result<Self> {
312        Ok(self)
313    }
314}