openai_core/pagination/
mod.rs1use std::collections::BTreeMap;
4use std::pin::Pin;
5
6use async_stream::try_stream;
7use futures_util::Stream;
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10
11use crate::client::PageRequestSpec;
12use crate::error::{Error, Result};
13#[derive(Debug, Clone, Serialize, Deserialize)]
15#[serde(bound(deserialize = "T: serde::de::DeserializeOwned"))]
16pub struct ListEnvelope<T> {
17 #[serde(default)]
19 pub object: String,
20 #[serde(default)]
22 pub data: Vec<T>,
23 pub first_id: Option<String>,
25 pub last_id: Option<String>,
27 #[serde(default)]
29 pub has_more: bool,
30 #[serde(flatten)]
32 pub extra: BTreeMap<String, Value>,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
37#[serde(bound(deserialize = "T: serde::de::DeserializeOwned"))]
38pub struct Page<T> {
39 #[serde(default)]
41 pub object: String,
42 #[serde(default)]
44 pub data: Vec<T>,
45 #[serde(flatten)]
47 pub extra: BTreeMap<String, Value>,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct CursorPage<T> {
53 #[serde(default)]
55 pub object: String,
56 #[serde(default)]
58 pub data: Vec<T>,
59 pub first_id: Option<String>,
61 pub last_id: Option<String>,
63 #[serde(default)]
65 pub has_more: bool,
66 #[serde(flatten)]
68 pub extra: BTreeMap<String, Value>,
69 #[serde(skip)]
71 pub(crate) next: Option<PageRequestSpec>,
72}
73
74impl<T> Default for CursorPage<T> {
75 fn default() -> Self {
76 Self {
77 object: String::new(),
78 data: Vec::new(),
79 first_id: None,
80 last_id: None,
81 has_more: false,
82 extra: BTreeMap::new(),
83 next: None,
84 }
85 }
86}
87
88impl<T> From<ListEnvelope<T>> for CursorPage<T> {
89 fn from(value: ListEnvelope<T>) -> Self {
90 Self {
91 object: value.object,
92 data: value.data,
93 first_id: value.first_id,
94 last_id: value.last_id,
95 has_more: value.has_more,
96 extra: value.extra,
97 next: None,
98 }
99 }
100}
101
102impl<T> CursorPage<T>
103where
104 T: Clone + Send + Sync + serde::de::DeserializeOwned + 'static,
105{
106 pub fn with_next_request(mut self, next: Option<PageRequestSpec>) -> Self {
108 self.next = next;
109 self
110 }
111
112 pub fn has_next_page(&self) -> bool {
114 self.has_more && self.next.is_some()
115 }
116
117 pub async fn next_page(&self) -> Result<Self> {
123 let next = self
124 .next
125 .clone()
126 .ok_or_else(|| Error::InvalidConfig("当前页面没有下一页游标".into()))?;
127 let client = next.client.clone();
128 client.fetch_cursor_page(next).await
129 }
130
131 #[allow(tail_expr_drop_order)]
133 pub fn into_stream(self) -> PageStream<T> {
134 Box::pin(try_stream! {
135 let mut current = Some(self);
136
137 while let Some(page) = current.take() {
138 for item in &page.data {
139 yield item.clone();
140 }
141
142 if page.has_next_page() {
143 current = Some(page.next_page().await?);
144 }
145 }
146 })
147 }
148}
149
150pub type PageStream<T> = Pin<Box<dyn Stream<Item = Result<T>> + Send>>;