pub struct RedisConnectionManager { /* private fields */ }
Expand description
A bb8::ManageConnection
for redis::Client::get_async_connection
wrapped in a helper type
for namespacing.
Implementations§
Source§impl RedisConnectionManager
impl RedisConnectionManager
Sourcepub fn new<T: IntoConnectionInfo>(info: T) -> Result<Self, RedisError>
pub fn new<T: IntoConnectionInfo>(info: T) -> Result<Self, RedisError>
Create a new RedisConnectionManager
.
See redis::Client::open
for a description of the parameter types.
Examples found in repository?
examples/namespaced_demo.rs (line 22)
18async fn main() -> Result<()> {
19 tracing_subscriber::fmt::init();
20
21 // Redis
22 let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
23 let redis = Pool::builder()
24 .max_size(100)
25 .connection_customizer(sidekiq::with_custom_namespace("yolo_app".to_string()))
26 .build(manager)
27 .await?;
28
29 tokio::spawn({
30 let redis = redis.clone();
31
32 async move {
33 loop {
34 HelloWorker::perform_async(&redis, ()).await.unwrap();
35
36 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
37 }
38 }
39 });
40
41 // Sidekiq server
42 let mut p = Processor::new(redis.clone(), vec!["default".to_string()]);
43
44 // Add known workers
45 p.register(HelloWorker);
46
47 // Start!
48 p.run().await;
49 Ok(())
50}
More examples
examples/unique.rs (line 33)
29async fn main() -> Result<()> {
30 tracing_subscriber::fmt::init();
31
32 // Redis
33 let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
34 let redis = Pool::builder().build(manager).await?;
35
36 // Sidekiq server
37 let mut p = Processor::new(redis.clone(), vec!["customers".to_string()]);
38
39 // Add known workers
40 p.register(CustomerNotificationWorker);
41
42 // Create a bunch of jobs with the default uniqueness options. Only
43 // one of these should be created within a 30 second period.
44 for _ in 1..10 {
45 CustomerNotificationWorker::perform_async(
46 &redis,
47 CustomerNotification {
48 customer_guid: "CST-123".to_string(),
49 },
50 )
51 .await?;
52 }
53
54 // Override the unique_for option. Note: Because the code above
55 // uses the default unique_for value of 30, this code is essentially
56 // a no-op.
57 CustomerNotificationWorker::opts()
58 .unique_for(std::time::Duration::from_secs(90))
59 .perform_async(
60 &redis,
61 CustomerNotification {
62 customer_guid: "CST-123".to_string(),
63 },
64 )
65 .await?;
66
67 p.run().await;
68 Ok(())
69}
examples/demo.rs (line 118)
114async fn main() -> Result<()> {
115 tracing_subscriber::fmt().with_max_level(Level::INFO).init();
116
117 // Redis
118 let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
119 let redis = Pool::builder().build(manager).await?;
120
121 tokio::spawn({
122 let redis = redis.clone();
123
124 async move {
125 loop {
126 PaymentReportWorker::perform_async(
127 &redis,
128 PaymentReportArgs {
129 user_guid: "USR-123".into(),
130 },
131 )
132 .await
133 .unwrap();
134
135 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
136 }
137 }
138 });
139
140 // Enqueue a job with the worker! There are many ways to do this.
141 PaymentReportWorker::perform_async(
142 &redis,
143 PaymentReportArgs {
144 user_guid: "USR-123".into(),
145 },
146 )
147 .await?;
148
149 PaymentReportWorker::perform_in(
150 &redis,
151 std::time::Duration::from_secs(10),
152 PaymentReportArgs {
153 user_guid: "USR-123".into(),
154 },
155 )
156 .await?;
157
158 PaymentReportWorker::opts()
159 .queue("brolo")
160 .perform_async(
161 &redis,
162 PaymentReportArgs {
163 user_guid: "USR-123-EXPIRED".into(),
164 },
165 )
166 .await?;
167
168 sidekiq::perform_async(
169 &redis,
170 "PaymentReportWorker".into(),
171 "yolo".into(),
172 PaymentReportArgs {
173 user_guid: "USR-123".to_string(),
174 },
175 )
176 .await?;
177
178 // Enqueue a job
179 sidekiq::perform_async(
180 &redis,
181 "PaymentReportWorker".into(),
182 "yolo".into(),
183 PaymentReportArgs {
184 user_guid: "USR-123".to_string(),
185 },
186 )
187 .await?;
188
189 // Enqueue a job with options
190 sidekiq::opts()
191 .queue("yolo".to_string())
192 .perform_async(
193 &redis,
194 "PaymentReportWorker".into(),
195 PaymentReportArgs {
196 user_guid: "USR-123".to_string(),
197 },
198 )
199 .await?;
200
201 // Sidekiq server
202 let mut p = Processor::new(redis.clone(), vec!["yolo".to_string(), "brolo".to_string()]);
203
204 // Add known workers
205 p.register(HelloWorker);
206 p.register(PaymentReportWorker::new(redis.clone()));
207
208 // Custom Middlewares
209 p.using(FilterExpiredUsersMiddleware).await;
210
211 // Reset cron jobs
212 periodic::destroy_all(redis.clone()).await?;
213
214 // Cron jobs
215 periodic::builder("0 * * * * *")?
216 .name("Payment report processing for a user using json args")
217 .queue("yolo")
218 .args(json!({ "user_guid": "USR-123-PERIODIC-FROM-JSON-ARGS" }))?
219 .register(&mut p, PaymentReportWorker::new(redis.clone()))
220 .await?;
221
222 periodic::builder("0 * * * * *")?
223 .name("Payment report processing for a user using typed args")
224 .queue("yolo")
225 .args(PaymentReportArgs {
226 user_guid: "USR-123-PERIODIC-FROM-TYPED-ARGS".to_string(),
227 })?
228 .register(&mut p, PaymentReportWorker::new(redis.clone()))
229 .await?;
230
231 p.run().await;
232 Ok(())
233}
examples/consumer-demo.rs (line 95)
91async fn main() -> Result<()> {
92 tracing_subscriber::fmt::init();
93
94 // Redis
95 let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
96 let redis = Pool::builder().build(manager).await?;
97 //
98 // tokio::spawn({
99 // let mut redis = redis.clone();
100 //
101 // async move {
102 // loop {
103 // PaymentReportWorker::perform_async(
104 // &mut redis,
105 // PaymentReportArgs {
106 // user_guid: "USR-123".into(),
107 // },
108 // )
109 // .await
110 // .unwrap();
111 //
112 // tokio::time::sleep(std::time::Duration::from_secs(1)).await;
113 // }
114 // }
115 // });
116 //
117 // // Enqueue a job with the worker! There are many ways to do this.
118 // PaymentReportWorker::perform_async(
119 // &mut redis,
120 // PaymentReportArgs {
121 // user_guid: "USR-123".into(),
122 // },
123 // )
124 // .await?;
125 //
126 // PaymentReportWorker::perform_in(
127 // &mut redis,
128 // std::time::Duration::from_secs(10),
129 // PaymentReportArgs {
130 // user_guid: "USR-123".into(),
131 // },
132 // )
133 // .await?;
134 //
135 // PaymentReportWorker::opts()
136 // .queue("brolo")
137 // .perform_async(
138 // &mut redis,
139 // PaymentReportArgs {
140 // user_guid: "USR-123-EXPIRED".into(),
141 // },
142 // )
143 // .await?;
144 //
145 // sidekiq::perform_async(
146 // &mut redis,
147 // "PaymentReportWorker".into(),
148 // "yolo".into(),
149 // PaymentReportArgs {
150 // user_guid: "USR-123".to_string(),
151 // },
152 // )
153 // .await?;
154 //
155 // // Enqueue a job
156 // sidekiq::perform_async(
157 // &mut redis,
158 // "PaymentReportWorker".into(),
159 // "yolo".into(),
160 // PaymentReportArgs {
161 // user_guid: "USR-123".to_string(),
162 // },
163 // )
164 // .await?;
165 //
166 // // Enqueue a job with options
167 // sidekiq::opts()
168 // .queue("yolo".to_string())
169 // .perform_async(
170 // &mut redis,
171 // "PaymentReportWorker".into(),
172 // PaymentReportArgs {
173 // user_guid: "USR-123".to_string(),
174 // },
175 // )
176 // .await?;
177
178 // Sidekiq server
179 let mut p = Processor::new(redis.clone(), vec!["yolo".to_string(), "brolo".to_string()]);
180
181 // Add known workers
182 p.register(HelloWorker);
183 p.register(PaymentReportWorker);
184
185 // Custom Middlewares
186 p.using(FilterExpiredUsersMiddleware).await;
187
188 // // Reset cron jobs
189 // periodic::destroy_all(redis.clone()).await?;
190 //
191 // // Cron jobs
192 // periodic::builder("0 * * * * *")?
193 // .name("Payment report processing for a random user")
194 // .queue("yolo")
195 // //.args(PaymentReportArgs {
196 // // user_guid: "USR-123-PERIODIC".to_string(),
197 // //})?
198 // .args(json!({ "user_guid": "USR-123-PERIODIC" }))?
199 // .register(&mut p, PaymentReportWorker::new(logger.clone()))
200 // .await?;
201
202 p.run().await;
203 Ok(())
204}
examples/producer-demo.rs (line 97)
93async fn main() -> Result<()> {
94 tracing_subscriber::fmt::init();
95
96 // Redis
97 let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
98 let redis = Pool::builder().build(manager).await?;
99
100 let mut n = 0;
101 let mut last = 0;
102 let mut then = std::time::Instant::now();
103
104 loop {
105 PaymentReportWorker::perform_async(
106 &redis,
107 PaymentReportArgs {
108 user_guid: "USR-123".into(),
109 },
110 )
111 .await
112 .unwrap();
113
114 //tokio::time::sleep(std::time::Duration::from_millis(1)).await;
115
116 n += 1;
117
118 if n % 100000 == 0 {
119 let now = std::time::Instant::now();
120 let delta = n - last;
121 last = n;
122 let delta_time = now - then;
123 if delta_time.as_secs() == 0 {
124 continue;
125 }
126 then = now;
127 let rate = delta / delta_time.as_secs();
128 println!("Iterations since last: {delta} at a rate of: {rate} iter/sec");
129 }
130 }
131
132 // // Enqueue a job with the worker! There are many ways to do this.
133 // PaymentReportWorker::perform_async(
134 // &mut redis,
135 // PaymentReportArgs {
136 // user_guid: "USR-123".into(),
137 // },
138 // )
139 // .await?;
140 //
141 // PaymentReportWorker::perform_in(
142 // &mut redis,
143 // std::time::Duration::from_secs(10),
144 // PaymentReportArgs {
145 // user_guid: "USR-123".into(),
146 // },
147 // )
148 // .await?;
149 //
150 // PaymentReportWorker::opts()
151 // .queue("brolo")
152 // .perform_async(
153 // &mut redis,
154 // PaymentReportArgs {
155 // user_guid: "USR-123-EXPIRED".into(),
156 // },
157 // )
158 // .await?;
159 //
160 // sidekiq::perform_async(
161 // &mut redis,
162 // "PaymentReportWorker".into(),
163 // "yolo".into(),
164 // PaymentReportArgs {
165 // user_guid: "USR-123".to_string(),
166 // },
167 // )
168 // .await?;
169 //
170 // // Enqueue a job
171 // sidekiq::perform_async(
172 // &mut redis,
173 // "PaymentReportWorker".into(),
174 // "yolo".into(),
175 // PaymentReportArgs {
176 // user_guid: "USR-123".to_string(),
177 // },
178 // )
179 // .await?;
180 //
181 // // Enqueue a job with options
182 // sidekiq::opts()
183 // .queue("yolo".to_string())
184 // .perform_async(
185 // &mut redis,
186 // "PaymentReportWorker".into(),
187 // PaymentReportArgs {
188 // user_guid: "USR-123".to_string(),
189 // },
190 // )
191 // .await?;
192
193 // // Sidekiq server
194 // let mut p = Processor::new(
195 // redis.clone(),
196 // logger.clone(),
197 // //vec!["yolo".to_string(), "brolo".to_string()],
198 // vec![],
199 // );
200 //
201 // // // Add known workers
202 // // p.register(HelloWorker);
203 // // p.register(PaymentReportWorker::new(logger.clone()));
204 // //
205 // // Custom Middlewares
206 // p.using(FilterExpiredUsersMiddleware::new(logger.clone()))
207 // .await;
208 //
209 // // Reset cron jobs
210 // periodic::destroy_all(redis.clone()).await?;
211 //
212 // // Cron jobs
213 // periodic::builder("0 * * * * *")?
214 // .name("Payment report processing for a random user")
215 // .queue("yolo")
216 // //.args(PaymentReportArgs {
217 // // user_guid: "USR-123-PERIODIC".to_string(),
218 // //})?
219 // .args(json!({ "user_guid": "USR-123-PERIODIC" }))?
220 // .register(&mut p, PaymentReportWorker::new(logger.clone()))
221 // .await?;
222 //
223 // p.run().await;
224}
Trait Implementations§
Source§impl Clone for RedisConnectionManager
impl Clone for RedisConnectionManager
Source§fn clone(&self) -> RedisConnectionManager
fn clone(&self) -> RedisConnectionManager
Returns a copy of the value. Read more
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
Performs copy-assignment from
source
. Read moreSource§impl Debug for RedisConnectionManager
impl Debug for RedisConnectionManager
Source§impl ManageConnection for RedisConnectionManager
impl ManageConnection for RedisConnectionManager
Source§type Connection = RedisConnection
type Connection = RedisConnection
The connection type this manager deals with.
Source§type Error = RedisError
type Error = RedisError
The error type returned by
Connection
s.Source§async fn connect(&self) -> Result<Self::Connection, Self::Error>
async fn connect(&self) -> Result<Self::Connection, Self::Error>
Attempts to create a new connection.
Source§async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error>
async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error>
Determines if the connection is still connected to the database.
Source§fn has_broken(&self, _conn: &mut Self::Connection) -> bool
fn has_broken(&self, _conn: &mut Self::Connection) -> bool
Synchronously determine if the connection is no longer usable, if possible.
Auto Trait Implementations§
impl Freeze for RedisConnectionManager
impl RefUnwindSafe for RedisConnectionManager
impl Send for RedisConnectionManager
impl Sync for RedisConnectionManager
impl Unpin for RedisConnectionManager
impl UnwindSafe for RedisConnectionManager
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more