1pub mod enqueue;
2pub mod get;
3pub mod instances;
4pub mod list;
5pub mod logs;
6pub mod message;
7pub mod publish;
8pub mod queue;
9pub mod selector;
10pub mod spawn;
11pub mod tags;
12pub mod wait;
13
14#[derive(clap::Subcommand)]
15pub enum Command {
16 Enqueue(enqueue::Command),
19 Get(get::Command),
21 Instances {
24 #[command(subcommand)]
25 command: instances::Command,
26 },
27 List(list::Command),
29 Logs {
31 #[command(subcommand)]
32 command: logs::Command,
33 },
34 Message(message::Command),
37 Publish(publish::Command),
39 Queue {
42 #[command(subcommand)]
43 command: queue::Command,
44 },
45 Spawn(spawn::Command),
48 Tags {
50 #[command(subcommand)]
51 command: tags::Command,
52 },
53 Wait(wait::Command),
56}
57
58#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
59#[serde(untagged)]
60#[schemars(rename = "cli.command.agents.Request")]
61pub enum Request {
62 #[schemars(title = "Enqueue")]
63 Enqueue(enqueue::Request),
64 #[schemars(title = "EnqueueRequestSchema")]
65 EnqueueRequestSchema(enqueue::request_schema::Request),
66 #[schemars(title = "EnqueueResponseSchema")]
67 EnqueueResponseSchema(enqueue::response_schema::Request),
68 #[schemars(title = "Get")]
69 Get(get::Request),
70 #[schemars(title = "GetRequestSchema")]
71 GetRequestSchema(get::request_schema::Request),
72 #[schemars(title = "GetResponseSchema")]
73 GetResponseSchema(get::response_schema::Request),
74 #[schemars(title = "Instances")]
75 Instances(instances::Request),
76 #[schemars(title = "List")]
77 List(list::Request),
78 #[schemars(title = "ListRequestSchema")]
79 ListRequestSchema(list::request_schema::Request),
80 #[schemars(title = "ListResponseSchema")]
81 ListResponseSchema(list::response_schema::Request),
82 #[schemars(title = "Logs")]
83 Logs(logs::Request),
84 #[schemars(title = "Message")]
85 Message(message::Request),
86 #[schemars(title = "MessageRequestSchema")]
87 MessageRequestSchema(message::request_schema::Request),
88 #[schemars(title = "MessageResponseSchema")]
89 MessageResponseSchema(message::response_schema::Request),
90 #[schemars(title = "Publish")]
91 Publish(publish::Request),
92 #[schemars(title = "PublishRequestSchema")]
93 PublishRequestSchema(publish::request_schema::Request),
94 #[schemars(title = "PublishResponseSchema")]
95 PublishResponseSchema(publish::response_schema::Request),
96 #[schemars(title = "Queue")]
97 Queue(queue::Request),
98 #[schemars(title = "Spawn")]
99 Spawn(spawn::Request),
100 #[schemars(title = "SpawnRequestSchema")]
101 SpawnRequestSchema(spawn::request_schema::Request),
102 #[schemars(title = "SpawnResponseSchema")]
103 SpawnResponseSchema(spawn::response_schema::Request),
104 #[schemars(title = "Tags")]
105 Tags(tags::Request),
106 #[schemars(title = "Wait")]
107 Wait(wait::Request),
108 #[schemars(title = "WaitRequestSchema")]
109 WaitRequestSchema(wait::request_schema::Request),
110 #[schemars(title = "WaitResponseSchema")]
111 WaitResponseSchema(wait::response_schema::Request),
112}
113
114#[objectiveai_sdk_macros::json_schema_ignore]
117#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
118#[schemars(rename = "cli.command.agents.ResponseItem")]
119#[serde(untagged)]
120pub enum ResponseItem {
121 #[schemars(title = "Enqueue")]
122 Enqueue(enqueue::Response),
123 #[schemars(title = "EnqueueRequestSchema")]
124 EnqueueRequestSchema(enqueue::request_schema::Response),
125 #[schemars(title = "EnqueueResponseSchema")]
126 EnqueueResponseSchema(enqueue::response_schema::Response),
127 #[schemars(title = "Get")]
128 Get(get::Response),
129 #[schemars(title = "GetRequestSchema")]
130 GetRequestSchema(get::request_schema::Response),
131 #[schemars(title = "GetResponseSchema")]
132 GetResponseSchema(get::response_schema::Response),
133 #[schemars(title = "Instances")]
134 Instances(instances::ResponseItem),
135 #[schemars(title = "List")]
136 List(list::ResponseItem),
137 #[schemars(title = "ListRequestSchema")]
138 ListRequestSchema(list::request_schema::Response),
139 #[schemars(title = "ListResponseSchema")]
140 ListResponseSchema(list::response_schema::Response),
141 #[schemars(title = "Logs")]
142 Logs(logs::ResponseItem),
143 #[schemars(title = "Message")]
144 Message(message::Response),
145 #[schemars(title = "MessageRequestSchema")]
146 MessageRequestSchema(message::request_schema::Response),
147 #[schemars(title = "MessageResponseSchema")]
148 MessageResponseSchema(message::response_schema::Response),
149 #[schemars(title = "Publish")]
150 Publish(publish::Response),
151 #[schemars(title = "PublishRequestSchema")]
152 PublishRequestSchema(publish::request_schema::Response),
153 #[schemars(title = "PublishResponseSchema")]
154 PublishResponseSchema(publish::response_schema::Response),
155 #[schemars(title = "Queue")]
156 Queue(queue::ResponseItem),
157 #[schemars(title = "Spawn")]
158 Spawn(spawn::ResponseItem),
159 #[schemars(title = "SpawnRequestSchema")]
160 SpawnRequestSchema(spawn::request_schema::Response),
161 #[schemars(title = "SpawnResponseSchema")]
162 SpawnResponseSchema(spawn::response_schema::Response),
163 #[schemars(title = "Tags")]
164 Tags(tags::ResponseItem),
165 #[schemars(title = "Wait")]
166 Wait(wait::Response),
167 #[schemars(title = "WaitRequestSchema")]
168 WaitRequestSchema(wait::request_schema::Response),
169 #[schemars(title = "WaitResponseSchema")]
170 WaitResponseSchema(wait::response_schema::Response),
171}
172
173#[cfg(feature = "mcp")]
174impl crate::cli::command::CommandResponse for ResponseItem {
175 fn into_mcp(self) -> crate::cli::command::McpResponseItem {
176 match self {
177 ResponseItem::Enqueue(v) => v.into_mcp(),
178 ResponseItem::EnqueueRequestSchema(v) => v.into_mcp(),
179 ResponseItem::EnqueueResponseSchema(v) => v.into_mcp(),
180 ResponseItem::Get(v) => v.into_mcp(),
181 ResponseItem::GetRequestSchema(v) => v.into_mcp(),
182 ResponseItem::GetResponseSchema(v) => v.into_mcp(),
183 ResponseItem::Instances(v) => v.into_mcp(),
184 ResponseItem::List(v) => v.into_mcp(),
185 ResponseItem::ListRequestSchema(v) => v.into_mcp(),
186 ResponseItem::ListResponseSchema(v) => v.into_mcp(),
187 ResponseItem::Logs(v) => v.into_mcp(),
188 ResponseItem::Message(v) => v.into_mcp(),
189 ResponseItem::MessageRequestSchema(v) => v.into_mcp(),
190 ResponseItem::MessageResponseSchema(v) => v.into_mcp(),
191 ResponseItem::Publish(v) => v.into_mcp(),
192 ResponseItem::PublishRequestSchema(v) => v.into_mcp(),
193 ResponseItem::PublishResponseSchema(v) => v.into_mcp(),
194 ResponseItem::Queue(v) => v.into_mcp(),
195 ResponseItem::Spawn(v) => v.into_mcp(),
196 ResponseItem::SpawnRequestSchema(v) => v.into_mcp(),
197 ResponseItem::SpawnResponseSchema(v) => v.into_mcp(),
198 ResponseItem::Tags(v) => v.into_mcp(),
199 ResponseItem::Wait(v) => v.into_mcp(),
200 ResponseItem::WaitRequestSchema(v) => v.into_mcp(),
201 ResponseItem::WaitResponseSchema(v) => v.into_mcp(),
202 }
203 }
204}
205
206impl TryFrom<Command> for Request {
207 type Error = crate::cli::command::FromArgsError;
208 fn try_from(command: Command) -> Result<Self, Self::Error> {
209 match command {
210 Command::Enqueue(cmd) => match cmd.schema {
211 None => Ok(Request::Enqueue(enqueue::Request::try_from(cmd.args)?)),
212 Some(enqueue::Schema::RequestSchema(args)) =>
213 Ok(Request::EnqueueRequestSchema(enqueue::request_schema::Request::try_from(args)?)),
214 Some(enqueue::Schema::ResponseSchema(args)) =>
215 Ok(Request::EnqueueResponseSchema(enqueue::response_schema::Request::try_from(args)?)),
216 },
217 Command::Get(cmd) => match cmd.schema {
218 None => Ok(Request::Get(get::Request::try_from(cmd.args)?)),
219 Some(get::Schema::RequestSchema(args)) =>
220 Ok(Request::GetRequestSchema(get::request_schema::Request::try_from(args)?)),
221 Some(get::Schema::ResponseSchema(args)) =>
222 Ok(Request::GetResponseSchema(get::response_schema::Request::try_from(args)?)),
223 },
224 Command::Instances { command } =>
225 Ok(Request::Instances(instances::Request::try_from(command)?)),
226 Command::List(cmd) => match cmd.schema {
227 None => Ok(Request::List(list::Request::try_from(cmd.args)?)),
228 Some(list::Schema::RequestSchema(args)) =>
229 Ok(Request::ListRequestSchema(list::request_schema::Request::try_from(args)?)),
230 Some(list::Schema::ResponseSchema(args)) =>
231 Ok(Request::ListResponseSchema(list::response_schema::Request::try_from(args)?)),
232 },
233 Command::Logs { command } =>
234 Ok(Request::Logs(logs::Request::try_from(command)?)),
235 Command::Message(cmd) => match cmd.schema {
236 None => Ok(Request::Message(message::Request::try_from(cmd.args)?)),
237 Some(message::Schema::RequestSchema(args)) =>
238 Ok(Request::MessageRequestSchema(message::request_schema::Request::try_from(args)?)),
239 Some(message::Schema::ResponseSchema(args)) =>
240 Ok(Request::MessageResponseSchema(message::response_schema::Request::try_from(args)?)),
241 },
242 Command::Publish(cmd) => match cmd.schema {
243 None => Ok(Request::Publish(publish::Request::try_from(cmd.args)?)),
244 Some(publish::Schema::RequestSchema(args)) =>
245 Ok(Request::PublishRequestSchema(publish::request_schema::Request::try_from(args)?)),
246 Some(publish::Schema::ResponseSchema(args)) =>
247 Ok(Request::PublishResponseSchema(publish::response_schema::Request::try_from(args)?)),
248 },
249 Command::Queue { command } =>
250 Ok(Request::Queue(queue::Request::try_from(command)?)),
251 Command::Spawn(cmd) => match cmd.schema {
252 None => Ok(Request::Spawn(spawn::Request::try_from(cmd.args)?)),
253 Some(spawn::Schema::RequestSchema(args)) =>
254 Ok(Request::SpawnRequestSchema(spawn::request_schema::Request::try_from(args)?)),
255 Some(spawn::Schema::ResponseSchema(args)) =>
256 Ok(Request::SpawnResponseSchema(spawn::response_schema::Request::try_from(args)?)),
257 },
258 Command::Tags { command } =>
259 Ok(Request::Tags(tags::Request::try_from(command)?)),
260 Command::Wait(cmd) => match cmd.schema {
261 None => Ok(Request::Wait(wait::Request::try_from(cmd.args)?)),
262 Some(wait::Schema::RequestSchema(args)) =>
263 Ok(Request::WaitRequestSchema(wait::request_schema::Request::try_from(args)?)),
264 Some(wait::Schema::ResponseSchema(args)) =>
265 Ok(Request::WaitResponseSchema(wait::response_schema::Request::try_from(args)?)),
266 },
267 }
268 }
269}
270
271impl crate::cli::command::CommandRequest for Request {
272 fn into_command(&self) -> Vec<String> {
273 match self {
274 Request::Enqueue(inner) => inner.into_command(),
275 Request::EnqueueRequestSchema(inner) => inner.into_command(),
276 Request::EnqueueResponseSchema(inner) => inner.into_command(),
277 Request::Get(inner) => inner.into_command(),
278 Request::GetRequestSchema(inner) => inner.into_command(),
279 Request::GetResponseSchema(inner) => inner.into_command(),
280 Request::Instances(inner) => inner.into_command(),
281 Request::List(inner) => inner.into_command(),
282 Request::ListRequestSchema(inner) => inner.into_command(),
283 Request::ListResponseSchema(inner) => inner.into_command(),
284 Request::Logs(inner) => inner.into_command(),
285 Request::Message(inner) => inner.into_command(),
286 Request::MessageRequestSchema(inner) => inner.into_command(),
287 Request::MessageResponseSchema(inner) => inner.into_command(),
288 Request::Publish(inner) => inner.into_command(),
289 Request::PublishRequestSchema(inner) => inner.into_command(),
290 Request::PublishResponseSchema(inner) => inner.into_command(),
291 Request::Queue(inner) => inner.into_command(),
292 Request::Spawn(inner) => inner.into_command(),
293 Request::SpawnRequestSchema(inner) => inner.into_command(),
294 Request::SpawnResponseSchema(inner) => inner.into_command(),
295 Request::Tags(inner) => inner.into_command(),
296 Request::Wait(inner) => inner.into_command(),
297 Request::WaitRequestSchema(inner) => inner.into_command(),
298 Request::WaitResponseSchema(inner) => inner.into_command(),
299 }
300 }
301
302 fn request_base(&self) -> &crate::cli::command::RequestBase {
303 match self {
304 Request::Enqueue(inner) => inner.request_base(),
305 Request::EnqueueRequestSchema(inner) => inner.request_base(),
306 Request::EnqueueResponseSchema(inner) => inner.request_base(),
307 Request::Get(inner) => inner.request_base(),
308 Request::GetRequestSchema(inner) => inner.request_base(),
309 Request::GetResponseSchema(inner) => inner.request_base(),
310 Request::Instances(inner) => inner.request_base(),
311 Request::List(inner) => inner.request_base(),
312 Request::ListRequestSchema(inner) => inner.request_base(),
313 Request::ListResponseSchema(inner) => inner.request_base(),
314 Request::Logs(inner) => inner.request_base(),
315 Request::Message(inner) => inner.request_base(),
316 Request::MessageRequestSchema(inner) => inner.request_base(),
317 Request::MessageResponseSchema(inner) => inner.request_base(),
318 Request::Publish(inner) => inner.request_base(),
319 Request::PublishRequestSchema(inner) => inner.request_base(),
320 Request::PublishResponseSchema(inner) => inner.request_base(),
321 Request::Queue(inner) => inner.request_base(),
322 Request::Spawn(inner) => inner.request_base(),
323 Request::SpawnRequestSchema(inner) => inner.request_base(),
324 Request::SpawnResponseSchema(inner) => inner.request_base(),
325 Request::Tags(inner) => inner.request_base(),
326 Request::Wait(inner) => inner.request_base(),
327 Request::WaitRequestSchema(inner) => inner.request_base(),
328 Request::WaitResponseSchema(inner) => inner.request_base(),
329 }
330 }
331
332 fn request_base_mut(&mut self) -> Option<&mut crate::cli::command::RequestBase> {
333 match self {
334 Request::Enqueue(inner) => inner.request_base_mut(),
335 Request::EnqueueRequestSchema(inner) => inner.request_base_mut(),
336 Request::EnqueueResponseSchema(inner) => inner.request_base_mut(),
337 Request::Get(inner) => inner.request_base_mut(),
338 Request::GetRequestSchema(inner) => inner.request_base_mut(),
339 Request::GetResponseSchema(inner) => inner.request_base_mut(),
340 Request::Instances(inner) => inner.request_base_mut(),
341 Request::List(inner) => inner.request_base_mut(),
342 Request::ListRequestSchema(inner) => inner.request_base_mut(),
343 Request::ListResponseSchema(inner) => inner.request_base_mut(),
344 Request::Logs(inner) => inner.request_base_mut(),
345 Request::Message(inner) => inner.request_base_mut(),
346 Request::MessageRequestSchema(inner) => inner.request_base_mut(),
347 Request::MessageResponseSchema(inner) => inner.request_base_mut(),
348 Request::Publish(inner) => inner.request_base_mut(),
349 Request::PublishRequestSchema(inner) => inner.request_base_mut(),
350 Request::PublishResponseSchema(inner) => inner.request_base_mut(),
351 Request::Queue(inner) => inner.request_base_mut(),
352 Request::Spawn(inner) => inner.request_base_mut(),
353 Request::SpawnRequestSchema(inner) => inner.request_base_mut(),
354 Request::SpawnResponseSchema(inner) => inner.request_base_mut(),
355 Request::Tags(inner) => inner.request_base_mut(),
356 Request::Wait(inner) => inner.request_base_mut(),
357 Request::WaitRequestSchema(inner) => inner.request_base_mut(),
358 Request::WaitResponseSchema(inner) => inner.request_base_mut(),
359 }
360 }
361}
362
363#[cfg(feature = "cli-executor")]
364pub async fn execute<E: crate::cli::command::CommandExecutor>(
365 executor: &E,
366 request: Request,
367
368 agent_arguments: Option<&crate::cli::command::AgentArguments>,
369 ) -> Result<
370 std::pin::Pin<Box<dyn futures::Stream<Item = Result<ResponseItem, E::Error>> + Send>>,
371 E::Error,
372> {
373 use futures::StreamExt;
374 let stream: std::pin::Pin<Box<dyn futures::Stream<Item = Result<ResponseItem, E::Error>> + Send>> =
375 match request {
376 Request::Enqueue(req) => {
377 let value = enqueue::execute(executor, req, agent_arguments).await?;
378 Box::pin(crate::cli::command::StreamOnce::new(Ok(
379 ResponseItem::Enqueue(value),
380 )))
381 }
382 Request::EnqueueRequestSchema(req) => {
383 let value = enqueue::request_schema::execute(executor, req, agent_arguments).await?;
384 Box::pin(crate::cli::command::StreamOnce::new(Ok(
385 ResponseItem::EnqueueRequestSchema(value),
386 )))
387 }
388 Request::EnqueueResponseSchema(req) => {
389 let value = enqueue::response_schema::execute(executor, req, agent_arguments).await?;
390 Box::pin(crate::cli::command::StreamOnce::new(Ok(
391 ResponseItem::EnqueueResponseSchema(value),
392 )))
393 }
394 Request::Get(req) => {
395 let value = get::execute(executor, req, agent_arguments).await?;
396 Box::pin(crate::cli::command::StreamOnce::new(Ok(
397 ResponseItem::Get(value),
398 )))
399 }
400 Request::GetRequestSchema(req) => {
401 let value = get::request_schema::execute(executor, req, agent_arguments).await?;
402 Box::pin(crate::cli::command::StreamOnce::new(Ok(
403 ResponseItem::GetRequestSchema(value),
404 )))
405 }
406 Request::GetResponseSchema(req) => {
407 let value = get::response_schema::execute(executor, req, agent_arguments).await?;
408 Box::pin(crate::cli::command::StreamOnce::new(Ok(
409 ResponseItem::GetResponseSchema(value),
410 )))
411 }
412 Request::Instances(req) => {
413 let inner = instances::execute(executor, req, agent_arguments).await?;
414 Box::pin(inner.map(|r| r.map(ResponseItem::Instances)))
415 }
416 Request::List(req) => {
417 let inner = list::execute(executor, req, agent_arguments).await?;
418 Box::pin(inner.map(|r| r.map(ResponseItem::List)))
419 }
420 Request::ListRequestSchema(req) => {
421 let value = list::request_schema::execute(executor, req, agent_arguments).await?;
422 Box::pin(crate::cli::command::StreamOnce::new(Ok(
423 ResponseItem::ListRequestSchema(value),
424 )))
425 }
426 Request::ListResponseSchema(req) => {
427 let value = list::response_schema::execute(executor, req, agent_arguments).await?;
428 Box::pin(crate::cli::command::StreamOnce::new(Ok(
429 ResponseItem::ListResponseSchema(value),
430 )))
431 }
432 Request::Logs(req) => {
433 let inner = logs::execute(executor, req, agent_arguments).await?;
434 Box::pin(inner.map(|r| r.map(ResponseItem::Logs)))
435 }
436 Request::Message(req) => {
437 let value = message::execute(executor, req, agent_arguments).await?;
438 Box::pin(crate::cli::command::StreamOnce::new(Ok(
439 ResponseItem::Message(value),
440 )))
441 }
442 Request::MessageRequestSchema(req) => {
443 let value = message::request_schema::execute(executor, req, agent_arguments).await?;
444 Box::pin(crate::cli::command::StreamOnce::new(Ok(
445 ResponseItem::MessageRequestSchema(value),
446 )))
447 }
448 Request::MessageResponseSchema(req) => {
449 let value = message::response_schema::execute(executor, req, agent_arguments).await?;
450 Box::pin(crate::cli::command::StreamOnce::new(Ok(
451 ResponseItem::MessageResponseSchema(value),
452 )))
453 }
454 Request::Publish(req) => {
455 let value = publish::execute(executor, req, agent_arguments).await?;
456 Box::pin(crate::cli::command::StreamOnce::new(Ok(
457 ResponseItem::Publish(value),
458 )))
459 }
460 Request::PublishRequestSchema(req) => {
461 let value = publish::request_schema::execute(executor, req, agent_arguments).await?;
462 Box::pin(crate::cli::command::StreamOnce::new(Ok(
463 ResponseItem::PublishRequestSchema(value),
464 )))
465 }
466 Request::PublishResponseSchema(req) => {
467 let value = publish::response_schema::execute(executor, req, agent_arguments).await?;
468 Box::pin(crate::cli::command::StreamOnce::new(Ok(
469 ResponseItem::PublishResponseSchema(value),
470 )))
471 }
472 Request::Queue(req) => {
473 let inner = queue::execute(executor, req, agent_arguments).await?;
474 Box::pin(inner.map(|r| r.map(ResponseItem::Queue)))
475 }
476 Request::Spawn(req) => {
477 let want_streaming = req
478 .dangerous_advanced
479 .as_ref()
480 .and_then(|a| a.stream)
481 .unwrap_or(false);
482 if want_streaming {
483 let inner = spawn::execute_streaming(executor, req, agent_arguments).await?;
484 Box::pin(inner.map(|r| r.map(ResponseItem::Spawn)))
485 } else {
486 let value = spawn::execute(executor, req, agent_arguments).await?;
487 Box::pin(crate::cli::command::StreamOnce::new(Ok(
488 ResponseItem::Spawn(spawn::ResponseItem::Id(value)),
489 )))
490 }
491 }
492 Request::SpawnRequestSchema(req) => {
493 let value = spawn::request_schema::execute(executor, req, agent_arguments).await?;
494 Box::pin(crate::cli::command::StreamOnce::new(Ok(
495 ResponseItem::SpawnRequestSchema(value),
496 )))
497 }
498 Request::SpawnResponseSchema(req) => {
499 let value = spawn::response_schema::execute(executor, req, agent_arguments).await?;
500 Box::pin(crate::cli::command::StreamOnce::new(Ok(
501 ResponseItem::SpawnResponseSchema(value),
502 )))
503 }
504 Request::Tags(req) => {
505 let inner = tags::execute(executor, req, agent_arguments).await?;
506 Box::pin(inner.map(|r| r.map(ResponseItem::Tags)))
507 }
508 Request::Wait(req) => {
509 let value = wait::execute(executor, req, agent_arguments).await?;
510 Box::pin(crate::cli::command::StreamOnce::new(Ok(
511 ResponseItem::Wait(value),
512 )))
513 }
514 Request::WaitRequestSchema(req) => {
515 let value = wait::request_schema::execute(executor, req, agent_arguments).await?;
516 Box::pin(crate::cli::command::StreamOnce::new(Ok(
517 ResponseItem::WaitRequestSchema(value),
518 )))
519 }
520 Request::WaitResponseSchema(req) => {
521 let value = wait::response_schema::execute(executor, req, agent_arguments).await?;
522 Box::pin(crate::cli::command::StreamOnce::new(Ok(
523 ResponseItem::WaitResponseSchema(value),
524 )))
525 }
526 };
527 Ok(stream)
528}
529
530#[cfg(feature = "cli-executor")]
531pub async fn execute_transform<E: crate::cli::command::CommandExecutor>(
532 executor: &E,
533 request: Request,
534 transform: crate::cli::command::Transform,
535
536 agent_arguments: Option<&crate::cli::command::AgentArguments>,
537 ) -> Result<
538 std::pin::Pin<Box<dyn futures::Stream<Item = Result<serde_json::Value, E::Error>> + Send>>,
539 E::Error,
540> {
541 let stream: std::pin::Pin<Box<dyn futures::Stream<Item = Result<serde_json::Value, E::Error>> + Send>> =
542 match request {
543 Request::Enqueue(req) => {
544 let value = enqueue::execute_transform(executor, req, transform, agent_arguments).await?;
545 Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
546 }
547 Request::EnqueueRequestSchema(req) => {
548 let value =
549 enqueue::request_schema::execute_transform(executor, req, transform, agent_arguments).await?;
550 Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
551 }
552 Request::EnqueueResponseSchema(req) => {
553 let value =
554 enqueue::response_schema::execute_transform(executor, req, transform, agent_arguments).await?;
555 Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
556 }
557 Request::Get(req) => {
558 let value = get::execute_transform(executor, req, transform, agent_arguments).await?;
559 Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
560 }
561 Request::GetRequestSchema(req) => {
562 let value = get::request_schema::execute_transform(executor, req, transform, agent_arguments).await?;
563 Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
564 }
565 Request::GetResponseSchema(req) => {
566 let value = get::response_schema::execute_transform(executor, req, transform, agent_arguments).await?;
567 Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
568 }
569 Request::Instances(req) => {
570 let inner = instances::execute_transform(executor, req, transform, agent_arguments).await?;
571 Box::pin(inner)
572 }
573 Request::List(req) => {
574 let inner = list::execute_transform(executor, req, transform, agent_arguments).await?;
575 Box::pin(inner)
576 }
577 Request::ListRequestSchema(req) => {
578 let value = list::request_schema::execute_transform(executor, req, transform, agent_arguments).await?;
579 Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
580 }
581 Request::ListResponseSchema(req) => {
582 let value = list::response_schema::execute_transform(executor, req, transform, agent_arguments).await?;
583 Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
584 }
585 Request::Logs(req) => {
586 let inner = logs::execute_transform(executor, req, transform, agent_arguments).await?;
587 Box::pin(inner)
588 }
589 Request::Message(req) => {
590 let value = message::execute_transform(executor, req, transform, agent_arguments).await?;
591 Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
592 }
593 Request::MessageRequestSchema(req) => {
594 let value =
595 message::request_schema::execute_transform(executor, req, transform, agent_arguments).await?;
596 Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
597 }
598 Request::MessageResponseSchema(req) => {
599 let value =
600 message::response_schema::execute_transform(executor, req, transform, agent_arguments).await?;
601 Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
602 }
603 Request::Publish(req) => {
604 let value = publish::execute_transform(executor, req, transform, agent_arguments).await?;
605 Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
606 }
607 Request::PublishRequestSchema(req) => {
608 let value = publish::request_schema::execute_transform(executor, req, transform, agent_arguments).await?;
609 Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
610 }
611 Request::PublishResponseSchema(req) => {
612 let value = publish::response_schema::execute_transform(executor, req, transform, agent_arguments).await?;
613 Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
614 }
615 Request::Queue(req) => {
616 let inner = queue::execute_transform(executor, req, transform, agent_arguments).await?;
617 Box::pin(inner)
618 }
619 Request::Spawn(req) => {
620 let want_streaming = req
621 .dangerous_advanced
622 .as_ref()
623 .and_then(|a| a.stream)
624 .unwrap_or(false);
625 if want_streaming {
626 let inner = spawn::execute_streaming_transform(executor, req, transform, agent_arguments).await?;
627 Box::pin(inner)
628 } else {
629 let value = spawn::execute_transform(executor, req, transform, agent_arguments).await?;
630 Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
631 }
632 }
633 Request::SpawnRequestSchema(req) => {
634 let value =
635 spawn::request_schema::execute_transform(executor, req, transform, agent_arguments).await?;
636 Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
637 }
638 Request::SpawnResponseSchema(req) => {
639 let value =
640 spawn::response_schema::execute_transform(executor, req, transform, agent_arguments).await?;
641 Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
642 }
643 Request::Tags(req) => {
644 let inner = tags::execute_transform(executor, req, transform, agent_arguments).await?;
645 Box::pin(inner)
646 }
647 Request::Wait(req) => {
648 let value = wait::execute_transform(executor, req, transform, agent_arguments).await?;
649 Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
650 }
651 Request::WaitRequestSchema(req) => {
652 let value =
653 wait::request_schema::execute_transform(executor, req, transform, agent_arguments).await?;
654 Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
655 }
656 Request::WaitResponseSchema(req) => {
657 let value =
658 wait::response_schema::execute_transform(executor, req, transform, agent_arguments).await?;
659 Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
660 }
661 };
662 Ok(stream)
663}